この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
GolangでETL処理を行うためのライブラリとしてratchetがあります。このライブラリは"A library for performing data pipeline / ETL tasks in Go."とあるように、パイプラインやETLタスクをGolangで実行するためのものです。
使用方法はドキュメントにexampleもあるのですが、今回はこれを元に実際のファイルからデータの読み取り・変換・出力を行ってみました。
サンプルの実装について
先に書いたように、実際のファイルからのETL処理のサンプルとなります。処理の流れとしては
- hello_world.txtからデータを読み取り
- 読み取ったデータ内のスペースをカンマに変換
- 変換したデータをhello_world.csvに出力
となります。以下、そのソースと説明です。
package main
import (
"log"
"os"
"strings"
"github.com/dailyburn/ratchet"
"github.com/dailyburn/ratchet/data"
"github.com/dailyburn/ratchet/processors"
)
type myTransformer struct{}
// 変換処理を実装
// DataProcessor interfaceに則り実装
func (t *myTransformer) ProcessData(d data.JSON,
outputChan chan data.JSON,
killChan chan error) {
b := []byte(d)
s := string(b)
r := strings.Replace(s, " ", ",", -1)
outputChan <- []byte(r)
}
// DataProcessor interfaceに則り実装
func (t *myTransformer) Finish(outputChan chan data.JSON, killChan chan error) {
}
// New MyTransformer
func newMyTransformer() *myTransformer {
return &myTransformer{}
}
// Writer作成
func createWriter(fileName string) *processors.IoWriter {
file, err := os.Create(fileName)
if err != nil {
log.Fatal(err)
}
return processors.NewIoWriter(file)
}
func main() {
reader := processors.NewFileReader("hello_world.txt")
trans := newMyTransformer()
writer := createWriter("hello_world.csv")
pipeline := ratchet.NewPipeline(reader, trans, writer) // 可変長引数でDataProcessorを複数渡せる
err := <-pipeline.Run()
if err != nil {
log.Fatal(err)
}
}
main処理
読み込み・変換・出力を行うDataProcessorを、それぞれ「reader」「trans」「writer」という変数に先ず代入します。後でも説明しますが、これらは全てDataProcessor Interfaceを保持しています。
これらのDataProcessorをPipelineに登録し、Run()で実行します。
読み込みについて
ファイルからの読み込みはratchetに用意されている「FileReader」を使用します。引数に読み取る対象のファイル名を渡しているだけです。
変換について
変換処理については独自に実装するため、DataProcessor Interfaceを持つstrunctを作成しました。「myTransformer」がそれで、DataProcessor Interfaceに則り「ProcessData」「Finish」メソッドを持ちます。
「ProcessData」メソッド内にはデータの変換処理を実装しています。前処理となるFileReaderにて読み取ったファイル内のデータを、引数「d」より取得し、変換後に「output」チャネルに登録しています。この
- 入力は引数から取得
- 出力はチャネルに登録
というのが、ratchetのDataProcessorを実装する上での一つの特徴になるかと思います。
出力について
出力処理にはついてratchetに用意されている「IoWriter」を使用していますが、その準備は「createWriter」メソッドに纏めてあります。
まとめ
読み込み・変換・出力を行うDataProcessorをPipelineに登録して実行する、というratchetの処理の流れを理解することができました。また変換処理を実装するためにratchet内のDataProcessorのソースを読んだのですが、Golangの特徴もあってか非常に読みやすいという印象を受けました。何かの参考になれば幸いです。
参考サイト
ratchet
Data Pipeline and ETL tasks in Go using Ratchet
package ratchet