AWS SDK for Go で Amazon S3 Select を試してみた

2018.08.06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Amazon S3 Select (以降S3 Select)はAmazon S3に保存したオブジェクト(CSVまたはJSONファイル)からSQLでデータを抽出できるサービスです。 サービスのリリース当時はS3 Selectに対応したAWS SDKはJavaとPythonのみでしたが、ここ数ヶ月間でその他SDKでの対応がアナウンスされています。

今回のエントリーではAWS SDK for GoでS3 Selectを試してみた様子をご紹介します。

検証環境

  • OS : macOS High Sierra v10.13.6
  • Golang : v1.10.3
  • AWS SDK for Go : v1.15.0

CSVファイルからのデータ抽出

サンプルコード

package main

import (
	"fmt"
	"os"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
)

func exitErrorf(msg string, args ...interface{}) {
	fmt.Fprintf(os.Stderr, msg+"\n", args...)
	os.Exit(1)
}

func main() {

	sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1")))
	svc := s3.New(sess)

	params := &s3.SelectObjectContentInput{
		Bucket:          aws.String("s3-select-sampleyy"),
		Key:             aws.String("sample.csv"),
		ExpressionType:  aws.String(s3.ExpressionTypeSql),
		Expression:      aws.String("SELECT * FROM S3Object LIMIT 10"),
		RequestProgress: &s3.RequestProgress{},
		InputSerialization: &s3.InputSerialization{
			CompressionType: aws.String("NONE"),
			CSV: &s3.CSVInput{
				FileHeaderInfo: aws.String(s3.FileHeaderInfoUse),
			},
		},
		OutputSerialization: &s3.OutputSerialization{
			CSV: &s3.CSVOutput{},
		},
	}

	resp, err := svc.SelectObjectContent(params)
	if err != nil {
		exitErrorf("failed making API request", err)
	}
	defer resp.EventStream.Close()

	for event := range resp.EventStream.Events() {
		// メッセージタイプ(イベントのタイプ)が ``Records`` の場合にメッセージからデータを取り出す
		v, ok := event.(*s3.RecordsEvent)
		if ok {
			fmt.Println(string(v.Payload))
		}
	}

	if err := resp.EventStream.Err(); err != nil {
		exitErrorf("failed to read from SelectObject EventStream, %v", err)
	}
}

リクエストパラメータ

必須パラメータは以下の通りです。

  • Bucket : S3バケット名
  • Key : オブジェクトキー
  • ExpressionType : クエリ言語。 SQL を指定。
  • Expression : 実行するSQL文
  • InputSerialization : クエリ対象ファイルのフォーマットを指定
  • CSV : CSVのフォーマットを指定
  • FileHeaderInfo : USEを指定すると、1行目をカラムヘッダとして扱う
  • OutputSerialization : アウトプットデータのフォーマットを指定
  • CSV : CSVのフォーマットを指定

その他のパラメータについてはAWS SDK for Go API Referenceを参照ください。

レスポンス

S3からのレスポンスは複数のチャンク(メッセージ)に分割されて送信されます。メッセージは以下の6種類です。

メッセージタイプ 説明
Records レコード(抽出されたデータ)が含まれるメッセージ
ContStatsinuation TCPコネクションを維持するためS3から定期的に送信されるメッセージ
Progress クエリの進捗状況が含まれるメッセージ。リクエストに「RequestProgress = yes」を含めることで同メッセージがS3から定期的に送信されるようになる
Stats クエリの統計情報が含まれるメッセージ
End S3から全データの送信が完了したことを示すメッセージ
RequestLevelError リクエストでエラーが発生したことを示すメッセージ。エラーコードとエラーメッセージが含まれる。このメッセージが送信された場合、Endメッセージは送信されない

Progress messageStats message に含まれる情報は以下の3つです。

項目 説明
BytesScanned スキャンされたデータのbyte数(圧縮ファイルの展開前)
BytesProcessed 処理されたデータのbyte数(圧縮ファイルの展開後)
BytesReturned S3から送信された抽出データのbyte数

※ 圧縮されていないファイルでは、BytesScannedBytesProcessed は同じ値になります。

レスポンス処理の具体的な実装としては、レスポンスの EventStreamEventsでメッセージタイプを得られるので、メッセージタイプが Records の場合にデータを取り出すようにします。

上記のコード例では以下の部分がこれに該当します。

for event := range resp.EventStream.Events() {
	// メッセージタイプ(イベントのタイプ)が ``Records`` の場合にメッセージからデータを取り出す
	v, ok := event.(*s3.RecordsEvent)
	if ok {
		fmt.Println(string(v.Payload))
	}
}

メッセージタイプのよって処理を分岐させたい場合は以下のように switch を使うとよいでしょう。

for event := range resp.EventStream.Events() {
	switch v := event.(type) {
	case *s3.RecordsEvent:
		fmt.Println(string(v.Payload))
	case *s3.StatsEvent:
		fmt.Println("Processed", *v.Details.BytesProcessed, "bytes")
		fmt.Println("Returned", *v.Details.BytesReturned, "bytes")
		fmt.Println("Scanned", *v.Details.BytesScanned, "bytes")
	case *s3.EndEvent:
		fmt.Println("SelectObjectContent completed")
	}
}

JSONファイルからのデータ抽出

サンプルコード

package main

import (
	"fmt"
	"os"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
)

func exitErrorf(msg string, args ...interface{}) {
	fmt.Fprintf(os.Stderr, msg+"\n", args...)
	os.Exit(1)
}

func main() {

	sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1")))
	svc := s3.New(sess)

	params := &s3.SelectObjectContentInput{
		Bucket:         aws.String("s3-select-sampleyy"),
		Key:            aws.String("sample.json"),
		ExpressionType: aws.String(s3.ExpressionTypeSql),
		Expression:     aws.String("SELECT * FROM S3Object LIMIT 10"),
		InputSerialization: &s3.InputSerialization{
			CompressionType: aws.String("NONE"),
			JSON: &s3.JSONInput{
				Type: aws.String("Lines"),
			},
		},
		OutputSerialization: &s3.OutputSerialization{
			JSON: &s3.JSONOutput{},
		},
	}

	resp, err := svc.SelectObjectContent(params)
	if err != nil {
		exitErrorf("failed making API request", err)
	}
	defer resp.EventStream.Close()

	for event := range resp.EventStream.Events() {
		switch v := event.(type) {
		case *s3.RecordsEvent:
			fmt.Println(string(v.Payload))
		case *s3.StatsEvent:
			fmt.Println("Processed", *v.Details.BytesProcessed, "bytes")
			fmt.Println("Returned", *v.Details.BytesReturned, "bytes")
			fmt.Println("Scanned", *v.Details.BytesScanned, "bytes")
		case *s3.EndEvent:
			fmt.Println("SelectObjectContent completed")
		}
	}

	if err := resp.EventStream.Err(); err != nil {
		exitErrorf("failed to read from SelectObject EventStream, %v", err)
	}
}

リクエストパラメータ

InputSerializationJSON を指定します。 JSONの Type については以下のブログ記事を参照ください。

Amazon S3 Select で扱う JSON データの形式と Type 指定について

ストリーム処理

AWS Developer Blogには、io.Pipeを使ってS3 Select取得したデータをcsv.Readerに渡す例が掲載されています。こちらを参考に、S3 Selectで取得したデータをgzipで圧縮してS3にアップロードするコードを書いてみました。(S3にアップロードされたRAWデータをS3 Selectで加工 → gzipに圧縮 → Amazon Redshiftにロード、といったユースケースをイメージしています。)

package main

import (
	"fmt"
	"os"

	"compress/gzip"
	"io"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func exitErrorf(msg string, args ...interface{}) {
	fmt.Fprintf(os.Stderr, msg+"\n", args...)
	os.Exit(1)
}

func main() {
	sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1")))
	svc := s3.New(sess)

	params := &s3.SelectObjectContentInput{
		Bucket:         aws.String("s3-select-sampleyy"),
		Key:            aws.String("sample.csv"),
		ExpressionType: aws.String(s3.ExpressionTypeSql),
		Expression:     aws.String("SELECT * FROM S3Object LIMIT 10"),
		InputSerialization: &s3.InputSerialization{
			CompressionType: aws.String("NONE"),
			CSV: &s3.CSVInput{
				FileHeaderInfo: aws.String(s3.FileHeaderInfoUse),
			},
		},
		OutputSerialization: &s3.OutputSerialization{
			CSV: &s3.CSVOutput{},
		},
	}

	resp, err := svc.SelectObjectContent(params)
	if err != nil {
		exitErrorf("failed making API request", err)
	}
	defer resp.EventStream.Close()

	pr, pw := io.Pipe()

	go func() {
		gw := gzip.NewWriter(pw)
		defer pw.Close()
		defer gw.Close()
		for event := range resp.EventStream.Events() {
			switch e := event.(type) {
			case *s3.RecordsEvent:
				gw.Write(e.Payload)
			case *s3.StatsEvent:
				fmt.Printf("Processed %d bytes\n", e.Details.BytesProcessed)
			}
		}
	}()

	uploader := s3manager.NewUploader(sess)
	key := "upload.csv.gzip"
	result, err := uploader.Upload(&s3manager.UploadInput{
		Body:   pr,
		Bucket: aws.String("s3-select-sampleyy"),
		Key:    aws.String(key),
	})

	if err != nil {
		exitErrorf("failed to upload %s\n, %v", key, err)
	}

	fmt.Printf("Successfully uploaded %s to %s\n", key, result.Location)
}

まとめ

Amazon S3 Selectは、SQLでデータの一部のみを高速に抽出できるという特徴から、従来はGet ObjectでS3からファイルをダウンロード&データ抽出していた処理の置き換えや、データ分析のETL処理での利用等が期待されます。

冒頭でご紹介したAWS Developer Blogには各AWS SDKでのコード例が掲載されていますので、皆さんもぜひお好みの言語でAmazon S3 Selectを試してみたください。