AWS SDK for Go で Amazon S3 Select を試してみた
Amazon S3 Select (以降S3 Select)はAmazon S3に保存したオブジェクト(CSVまたはJSONファイル)からSQLでデータを抽出できるサービスです。 サービスのリリース当時はS3 Selectに対応したAWS SDKはJavaとPythonのみでしたが、ここ数ヶ月間でその他SDKでの対応がアナウンスされています。
- Introducing support for Amazon S3 Select in the AWS SDK for Ruby | AWS Developer Blog
- Introducing support for Amazon S3 Select in the AWS SDK for Go | AWS Developer Blog
- Introducing support for Amazon S3 Select in the AWS SDK for JavaScript | AWS Developer Blog
- Amazon S3 Select Support in the AWS SDK for .NET | AWS Developer Blog
今回のエントリーではAWS SDK for GoでS3 Selectを試してみた様子をご紹介します。
検証環境
- OS : macOS High Sierra v10.13.6
- Golang : v1.10.3
- AWS SDK for Go : v1.15.0
CSVファイルからのデータ抽出
サンプルコード
package main import ( "fmt" "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" ) func exitErrorf(msg string, args ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", args...) os.Exit(1) } func main() { sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1"))) svc := s3.New(sess) params := &s3.SelectObjectContentInput{ Bucket: aws.String("s3-select-sampleyy"), Key: aws.String("sample.csv"), ExpressionType: aws.String(s3.ExpressionTypeSql), Expression: aws.String("SELECT * FROM S3Object LIMIT 10"), RequestProgress: &s3.RequestProgress{}, InputSerialization: &s3.InputSerialization{ CompressionType: aws.String("NONE"), CSV: &s3.CSVInput{ FileHeaderInfo: aws.String(s3.FileHeaderInfoUse), }, }, OutputSerialization: &s3.OutputSerialization{ CSV: &s3.CSVOutput{}, }, } resp, err := svc.SelectObjectContent(params) if err != nil { exitErrorf("failed making API request", err) } defer resp.EventStream.Close() for event := range resp.EventStream.Events() { // メッセージタイプ(イベントのタイプ)が ``Records`` の場合にメッセージからデータを取り出す v, ok := event.(*s3.RecordsEvent) if ok { fmt.Println(string(v.Payload)) } } if err := resp.EventStream.Err(); err != nil { exitErrorf("failed to read from SelectObject EventStream, %v", err) } }
リクエストパラメータ
必須パラメータは以下の通りです。
Bucket
: S3バケット名Key
: オブジェクトキーExpressionType
: クエリ言語。SQL
を指定。Expression
: 実行するSQL文InputSerialization
: クエリ対象ファイルのフォーマットを指定CSV
: CSVのフォーマットを指定FileHeaderInfo
:USE
を指定すると、1行目をカラムヘッダとして扱う- OutputSerialization : アウトプットデータのフォーマットを指定
CSV
: CSVのフォーマットを指定
その他のパラメータについてはAWS SDK for Go API Referenceを参照ください。
レスポンス
S3からのレスポンスは複数のチャンク(メッセージ)に分割されて送信されます。メッセージは以下の6種類です。
メッセージタイプ | 説明 |
---|---|
Records | レコード(抽出されたデータ)が含まれるメッセージ |
ContStatsinuation | TCPコネクションを維持するためS3から定期的に送信されるメッセージ |
Progress | クエリの進捗状況が含まれるメッセージ。リクエストに「RequestProgress = yes」を含めることで同メッセージがS3から定期的に送信されるようになる |
Stats | クエリの統計情報が含まれるメッセージ |
End | S3から全データの送信が完了したことを示すメッセージ |
RequestLevelError | リクエストでエラーが発生したことを示すメッセージ。エラーコードとエラーメッセージが含まれる。このメッセージが送信された場合、Endメッセージは送信されない |
Progress message
と Stats message
に含まれる情報は以下の3つです。
項目 | 説明 |
---|---|
BytesScanned | スキャンされたデータのbyte数(圧縮ファイルの展開前) |
BytesProcessed | 処理されたデータのbyte数(圧縮ファイルの展開後) |
BytesReturned | S3から送信された抽出データのbyte数 |
※ 圧縮されていないファイルでは、BytesScanned
と BytesProcessed
は同じ値になります。
レスポンス処理の具体的な実装としては、レスポンスの EventStream
の Events
でメッセージタイプを得られるので、メッセージタイプが Records
の場合にデータを取り出すようにします。
上記のコード例では以下の部分がこれに該当します。
for event := range resp.EventStream.Events() { // メッセージタイプ(イベントのタイプ)が ``Records`` の場合にメッセージからデータを取り出す v, ok := event.(*s3.RecordsEvent) if ok { fmt.Println(string(v.Payload)) } }
メッセージタイプのよって処理を分岐させたい場合は以下のように switch
を使うとよいでしょう。
for event := range resp.EventStream.Events() { switch v := event.(type) { case *s3.RecordsEvent: fmt.Println(string(v.Payload)) case *s3.StatsEvent: fmt.Println("Processed", *v.Details.BytesProcessed, "bytes") fmt.Println("Returned", *v.Details.BytesReturned, "bytes") fmt.Println("Scanned", *v.Details.BytesScanned, "bytes") case *s3.EndEvent: fmt.Println("SelectObjectContent completed") } }
JSONファイルからのデータ抽出
サンプルコード
package main import ( "fmt" "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" ) func exitErrorf(msg string, args ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", args...) os.Exit(1) } func main() { sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1"))) svc := s3.New(sess) params := &s3.SelectObjectContentInput{ Bucket: aws.String("s3-select-sampleyy"), Key: aws.String("sample.json"), ExpressionType: aws.String(s3.ExpressionTypeSql), Expression: aws.String("SELECT * FROM S3Object LIMIT 10"), InputSerialization: &s3.InputSerialization{ CompressionType: aws.String("NONE"), JSON: &s3.JSONInput{ Type: aws.String("Lines"), }, }, OutputSerialization: &s3.OutputSerialization{ JSON: &s3.JSONOutput{}, }, } resp, err := svc.SelectObjectContent(params) if err != nil { exitErrorf("failed making API request", err) } defer resp.EventStream.Close() for event := range resp.EventStream.Events() { switch v := event.(type) { case *s3.RecordsEvent: fmt.Println(string(v.Payload)) case *s3.StatsEvent: fmt.Println("Processed", *v.Details.BytesProcessed, "bytes") fmt.Println("Returned", *v.Details.BytesReturned, "bytes") fmt.Println("Scanned", *v.Details.BytesScanned, "bytes") case *s3.EndEvent: fmt.Println("SelectObjectContent completed") } } if err := resp.EventStream.Err(); err != nil { exitErrorf("failed to read from SelectObject EventStream, %v", err) } }
リクエストパラメータ
InputSerialization
で JSON
を指定します。 JSONの Type
については以下のブログ記事を参照ください。
ストリーム処理
AWS Developer Blogには、io.Pipeを使ってS3 Select取得したデータをcsv.Readerに渡す例が掲載されています。こちらを参考に、S3 Selectで取得したデータをgzipで圧縮してS3にアップロードするコードを書いてみました。(S3にアップロードされたRAWデータをS3 Selectで加工 → gzipに圧縮 → Amazon Redshiftにロード、といったユースケースをイメージしています。)
package main import ( "fmt" "os" "compress/gzip" "io" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func exitErrorf(msg string, args ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", args...) os.Exit(1) } func main() { sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1"))) svc := s3.New(sess) params := &s3.SelectObjectContentInput{ Bucket: aws.String("s3-select-sampleyy"), Key: aws.String("sample.csv"), ExpressionType: aws.String(s3.ExpressionTypeSql), Expression: aws.String("SELECT * FROM S3Object LIMIT 10"), InputSerialization: &s3.InputSerialization{ CompressionType: aws.String("NONE"), CSV: &s3.CSVInput{ FileHeaderInfo: aws.String(s3.FileHeaderInfoUse), }, }, OutputSerialization: &s3.OutputSerialization{ CSV: &s3.CSVOutput{}, }, } resp, err := svc.SelectObjectContent(params) if err != nil { exitErrorf("failed making API request", err) } defer resp.EventStream.Close() pr, pw := io.Pipe() go func() { gw := gzip.NewWriter(pw) defer pw.Close() defer gw.Close() for event := range resp.EventStream.Events() { switch e := event.(type) { case *s3.RecordsEvent: gw.Write(e.Payload) case *s3.StatsEvent: fmt.Printf("Processed %d bytes\n", e.Details.BytesProcessed) } } }() uploader := s3manager.NewUploader(sess) key := "upload.csv.gzip" result, err := uploader.Upload(&s3manager.UploadInput{ Body: pr, Bucket: aws.String("s3-select-sampleyy"), Key: aws.String(key), }) if err != nil { exitErrorf("failed to upload %s\n, %v", key, err) } fmt.Printf("Successfully uploaded %s to %s\n", key, result.Location) }
まとめ
Amazon S3 Selectは、SQLでデータの一部のみを高速に抽出できるという特徴から、従来はGet ObjectでS3からファイルをダウンロード&データ抽出していた処理の置き換えや、データ分析のETL処理での利用等が期待されます。
冒頭でご紹介したAWS Developer Blogには各AWS SDKでのコード例が掲載されていますので、皆さんもぜひお好みの言語でAmazon S3 Selectを試してみたください。