GolangでのETL処理 ratchetでのファイル読み込み〜出力まで
はじめに
GolangでETL処理を行うためのライブラリとしてratchetがあります。このライブラリは"A library for performing data pipeline / ETL tasks in Go."とあるように、パイプラインやETLタスクをGolangで実行するためのものです。
使用方法はドキュメントにexampleもあるのですが、今回はこれを元に実際のファイルからデータの読み取り・変換・出力を行ってみました。
サンプルの実装について
先に書いたように、実際のファイルからのETL処理のサンプルとなります。処理の流れとしては
- hello_world.txtからデータを読み取り
- 読み取ったデータ内のスペースをカンマに変換
- 変換したデータをhello_world.csvに出力
となります。以下、そのソースと説明です。
package main import ( "log" "os" "strings" "github.com/dailyburn/ratchet" "github.com/dailyburn/ratchet/data" "github.com/dailyburn/ratchet/processors" ) type myTransformer struct{} // 変換処理を実装 // DataProcessor interfaceに則り実装 func (t *myTransformer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) { b := []byte(d) s := string(b) r := strings.Replace(s, " ", ",", -1) outputChan <- []byte(r) } // DataProcessor interfaceに則り実装 func (t *myTransformer) Finish(outputChan chan data.JSON, killChan chan error) { } // New MyTransformer func newMyTransformer() *myTransformer { return &myTransformer{} } // Writer作成 func createWriter(fileName string) *processors.IoWriter { file, err := os.Create(fileName) if err != nil { log.Fatal(err) } return processors.NewIoWriter(file) } func main() { reader := processors.NewFileReader("hello_world.txt") trans := newMyTransformer() writer := createWriter("hello_world.csv") pipeline := ratchet.NewPipeline(reader, trans, writer) // 可変長引数でDataProcessorを複数渡せる err := <-pipeline.Run() if err != nil { log.Fatal(err) } }
main処理
読み込み・変換・出力を行うDataProcessorを、それぞれ「reader」「trans」「writer」という変数に先ず代入します。後でも説明しますが、これらは全てDataProcessor Interfaceを保持しています。
これらのDataProcessorをPipelineに登録し、Run()で実行します。
読み込みについて
ファイルからの読み込みはratchetに用意されている「FileReader」を使用します。引数に読み取る対象のファイル名を渡しているだけです。
変換について
変換処理については独自に実装するため、DataProcessor Interfaceを持つstrunctを作成しました。「myTransformer」がそれで、DataProcessor Interfaceに則り「ProcessData」「Finish」メソッドを持ちます。
「ProcessData」メソッド内にはデータの変換処理を実装しています。前処理となるFileReaderにて読み取ったファイル内のデータを、引数「d」より取得し、変換後に「output」チャネルに登録しています。この
- 入力は引数から取得
- 出力はチャネルに登録
というのが、ratchetのDataProcessorを実装する上での一つの特徴になるかと思います。
出力について
出力処理にはついてratchetに用意されている「IoWriter」を使用していますが、その準備は「createWriter」メソッドに纏めてあります。
まとめ
読み込み・変換・出力を行うDataProcessorをPipelineに登録して実行する、というratchetの処理の流れを理解することができました。また変換処理を実装するためにratchet内のDataProcessorのソースを読んだのですが、Golangの特徴もあってか非常に読みやすいという印象を受けました。何かの参考になれば幸いです。
参考サイト
ratchet
Data Pipeline and ETL tasks in Go using Ratchet
package ratchet