GolangでのETL処理 ratchetでのファイル読み込み〜出力まで

2017.06.05

はじめに

GolangでETL処理を行うためのライブラリとしてratchetがあります。このライブラリは"A library for performing data pipeline / ETL tasks in Go."とあるように、パイプラインやETLタスクをGolangで実行するためのものです。

使用方法はドキュメントにexampleもあるのですが、今回はこれを元に実際のファイルからデータの読み取り・変換・出力を行ってみました。

サンプルの実装について

先に書いたように、実際のファイルからのETL処理のサンプルとなります。処理の流れとしては

  • hello_world.txtからデータを読み取り
  • 読み取ったデータ内のスペースをカンマに変換
  • 変換したデータをhello_world.csvに出力

となります。以下、そのソースと説明です。

package main

import (
	"log"
	"os"
	"strings"

	"github.com/dailyburn/ratchet"
	"github.com/dailyburn/ratchet/data"
	"github.com/dailyburn/ratchet/processors"
)

type myTransformer struct{}

// 変換処理を実装
// DataProcessor interfaceに則り実装
func (t *myTransformer) ProcessData(d data.JSON,
	outputChan chan data.JSON,
	killChan chan error) {
	b := []byte(d)
	s := string(b)
	r := strings.Replace(s, " ", ",", -1)
	outputChan <- []byte(r)
}

// DataProcessor interfaceに則り実装
func (t *myTransformer) Finish(outputChan chan data.JSON, killChan chan error) {
}

// New MyTransformer
func newMyTransformer() *myTransformer {
	return &myTransformer{}
}

// Writer作成
func createWriter(fileName string) *processors.IoWriter {
	file, err := os.Create(fileName)
	if err != nil {
		log.Fatal(err)
	}

	return processors.NewIoWriter(file)
}

func main() {
	reader := processors.NewFileReader("hello_world.txt")
	trans := newMyTransformer()
	writer := createWriter("hello_world.csv")

	pipeline := ratchet.NewPipeline(reader, trans, writer) // 可変長引数でDataProcessorを複数渡せる
	err := <-pipeline.Run()
	if err != nil {
		log.Fatal(err)
	}
}

main処理

読み込み・変換・出力を行うDataProcessorを、それぞれ「reader」「trans」「writer」という変数に先ず代入します。後でも説明しますが、これらは全てDataProcessor Interfaceを保持しています。

これらのDataProcessorをPipelineに登録し、Run()で実行します。

読み込みについて

ファイルからの読み込みはratchetに用意されている「FileReader」を使用します。引数に読み取る対象のファイル名を渡しているだけです。

変換について

変換処理については独自に実装するため、DataProcessor Interfaceを持つstrunctを作成しました。「myTransformer」がそれで、DataProcessor Interfaceに則り「ProcessData」「Finish」メソッドを持ちます。

「ProcessData」メソッド内にはデータの変換処理を実装しています。前処理となるFileReaderにて読み取ったファイル内のデータを、引数「d」より取得し、変換後に「output」チャネルに登録しています。この

  • 入力は引数から取得
  • 出力はチャネルに登録

というのが、ratchetのDataProcessorを実装する上での一つの特徴になるかと思います。

出力について

出力処理にはついてratchetに用意されている「IoWriter」を使用していますが、その準備は「createWriter」メソッドに纏めてあります。

まとめ

読み込み・変換・出力を行うDataProcessorをPipelineに登録して実行する、というratchetの処理の流れを理解することができました。また変換処理を実装するためにratchet内のDataProcessorのソースを読んだのですが、Golangの特徴もあってか非常に読みやすいという印象を受けました。何かの参考になれば幸いです。

参考サイト

ratchet
Data Pipeline and ETL tasks in Go using Ratchet
package ratchet