この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Amazon S3 Select (以降S3 Select)はAmazon S3に保存したオブジェクト(CSVまたはJSONファイル)からSQLでデータを抽出できるサービスです。 サービスのリリース当時はS3 Selectに対応したAWS SDKはJavaとPythonのみでしたが、ここ数ヶ月間でその他SDKでの対応がアナウンスされています。
- Introducing support for Amazon S3 Select in the AWS SDK for Ruby | AWS Developer Blog
- Introducing support for Amazon S3 Select in the AWS SDK for Go | AWS Developer Blog
- Introducing support for Amazon S3 Select in the AWS SDK for JavaScript | AWS Developer Blog
- Amazon S3 Select Support in the AWS SDK for .NET | AWS Developer Blog
今回のエントリーではAWS SDK for GoでS3 Selectを試してみた様子をご紹介します。
検証環境
- OS : macOS High Sierra v10.13.6
- Golang : v1.10.3
- AWS SDK for Go : v1.15.0
CSVファイルからのデータ抽出
サンプルコード
package main
import (
"fmt"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
func exitErrorf(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, msg+"\n", args...)
os.Exit(1)
}
func main() {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1")))
svc := s3.New(sess)
params := &s3.SelectObjectContentInput{
Bucket: aws.String("s3-select-sampleyy"),
Key: aws.String("sample.csv"),
ExpressionType: aws.String(s3.ExpressionTypeSql),
Expression: aws.String("SELECT * FROM S3Object LIMIT 10"),
RequestProgress: &s3.RequestProgress{},
InputSerialization: &s3.InputSerialization{
CompressionType: aws.String("NONE"),
CSV: &s3.CSVInput{
FileHeaderInfo: aws.String(s3.FileHeaderInfoUse),
},
},
OutputSerialization: &s3.OutputSerialization{
CSV: &s3.CSVOutput{},
},
}
resp, err := svc.SelectObjectContent(params)
if err != nil {
exitErrorf("failed making API request", err)
}
defer resp.EventStream.Close()
for event := range resp.EventStream.Events() {
// メッセージタイプ(イベントのタイプ)が ``Records`` の場合にメッセージからデータを取り出す
v, ok := event.(*s3.RecordsEvent)
if ok {
fmt.Println(string(v.Payload))
}
}
if err := resp.EventStream.Err(); err != nil {
exitErrorf("failed to read from SelectObject EventStream, %v", err)
}
}
リクエストパラメータ
必須パラメータは以下の通りです。
Bucket
: S3バケット名Key
: オブジェクトキーExpressionType
: クエリ言語。SQL
を指定。Expression
: 実行するSQL文InputSerialization
: クエリ対象ファイルのフォーマットを指定CSV
: CSVのフォーマットを指定FileHeaderInfo
:USE
を指定すると、1行目をカラムヘッダとして扱う- OutputSerialization : アウトプットデータのフォーマットを指定
CSV
: CSVのフォーマットを指定
その他のパラメータについてはAWS SDK for Go API Referenceを参照ください。
レスポンス
S3からのレスポンスは複数のチャンク(メッセージ)に分割されて送信されます。メッセージは以下の6種類です。
メッセージタイプ | 説明 |
---|---|
Records | レコード(抽出されたデータ)が含まれるメッセージ |
ContStatsinuation | TCPコネクションを維持するためS3から定期的に送信されるメッセージ |
Progress | クエリの進捗状況が含まれるメッセージ。リクエストに「RequestProgress = yes」を含めることで同メッセージがS3から定期的に送信されるようになる |
Stats | クエリの統計情報が含まれるメッセージ |
End | S3から全データの送信が完了したことを示すメッセージ |
RequestLevelError | リクエストでエラーが発生したことを示すメッセージ。エラーコードとエラーメッセージが含まれる。このメッセージが送信された場合、Endメッセージは送信されない |
Progress message
と Stats message
に含まれる情報は以下の3つです。
項目 | 説明 |
---|---|
BytesScanned | スキャンされたデータのbyte数(圧縮ファイルの展開前) |
BytesProcessed | 処理されたデータのbyte数(圧縮ファイルの展開後) |
BytesReturned | S3から送信された抽出データのbyte数 |
※ 圧縮されていないファイルでは、BytesScanned
と BytesProcessed
は同じ値になります。
レスポンス処理の具体的な実装としては、レスポンスの EventStream
の Events
でメッセージタイプを得られるので、メッセージタイプが Records
の場合にデータを取り出すようにします。
上記のコード例では以下の部分がこれに該当します。
for event := range resp.EventStream.Events() {
// メッセージタイプ(イベントのタイプ)が ``Records`` の場合にメッセージからデータを取り出す
v, ok := event.(*s3.RecordsEvent)
if ok {
fmt.Println(string(v.Payload))
}
}
メッセージタイプのよって処理を分岐させたい場合は以下のように switch
を使うとよいでしょう。
for event := range resp.EventStream.Events() {
switch v := event.(type) {
case *s3.RecordsEvent:
fmt.Println(string(v.Payload))
case *s3.StatsEvent:
fmt.Println("Processed", *v.Details.BytesProcessed, "bytes")
fmt.Println("Returned", *v.Details.BytesReturned, "bytes")
fmt.Println("Scanned", *v.Details.BytesScanned, "bytes")
case *s3.EndEvent:
fmt.Println("SelectObjectContent completed")
}
}
JSONファイルからのデータ抽出
サンプルコード
package main
import (
"fmt"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
func exitErrorf(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, msg+"\n", args...)
os.Exit(1)
}
func main() {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1")))
svc := s3.New(sess)
params := &s3.SelectObjectContentInput{
Bucket: aws.String("s3-select-sampleyy"),
Key: aws.String("sample.json"),
ExpressionType: aws.String(s3.ExpressionTypeSql),
Expression: aws.String("SELECT * FROM S3Object LIMIT 10"),
InputSerialization: &s3.InputSerialization{
CompressionType: aws.String("NONE"),
JSON: &s3.JSONInput{
Type: aws.String("Lines"),
},
},
OutputSerialization: &s3.OutputSerialization{
JSON: &s3.JSONOutput{},
},
}
resp, err := svc.SelectObjectContent(params)
if err != nil {
exitErrorf("failed making API request", err)
}
defer resp.EventStream.Close()
for event := range resp.EventStream.Events() {
switch v := event.(type) {
case *s3.RecordsEvent:
fmt.Println(string(v.Payload))
case *s3.StatsEvent:
fmt.Println("Processed", *v.Details.BytesProcessed, "bytes")
fmt.Println("Returned", *v.Details.BytesReturned, "bytes")
fmt.Println("Scanned", *v.Details.BytesScanned, "bytes")
case *s3.EndEvent:
fmt.Println("SelectObjectContent completed")
}
}
if err := resp.EventStream.Err(); err != nil {
exitErrorf("failed to read from SelectObject EventStream, %v", err)
}
}
リクエストパラメータ
InputSerialization
で JSON
を指定します。 JSONの Type
については以下のブログ記事を参照ください。
ストリーム処理
AWS Developer Blogには、io.Pipeを使ってS3 Select取得したデータをcsv.Readerに渡す例が掲載されています。こちらを参考に、S3 Selectで取得したデータをgzipで圧縮してS3にアップロードするコードを書いてみました。(S3にアップロードされたRAWデータをS3 Selectで加工 → gzipに圧縮 → Amazon Redshiftにロード、といったユースケースをイメージしています。)
package main
import (
"fmt"
"os"
"compress/gzip"
"io"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
func exitErrorf(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, msg+"\n", args...)
os.Exit(1)
}
func main() {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("ap-northeast-1")))
svc := s3.New(sess)
params := &s3.SelectObjectContentInput{
Bucket: aws.String("s3-select-sampleyy"),
Key: aws.String("sample.csv"),
ExpressionType: aws.String(s3.ExpressionTypeSql),
Expression: aws.String("SELECT * FROM S3Object LIMIT 10"),
InputSerialization: &s3.InputSerialization{
CompressionType: aws.String("NONE"),
CSV: &s3.CSVInput{
FileHeaderInfo: aws.String(s3.FileHeaderInfoUse),
},
},
OutputSerialization: &s3.OutputSerialization{
CSV: &s3.CSVOutput{},
},
}
resp, err := svc.SelectObjectContent(params)
if err != nil {
exitErrorf("failed making API request", err)
}
defer resp.EventStream.Close()
pr, pw := io.Pipe()
go func() {
gw := gzip.NewWriter(pw)
defer pw.Close()
defer gw.Close()
for event := range resp.EventStream.Events() {
switch e := event.(type) {
case *s3.RecordsEvent:
gw.Write(e.Payload)
case *s3.StatsEvent:
fmt.Printf("Processed %d bytes\n", e.Details.BytesProcessed)
}
}
}()
uploader := s3manager.NewUploader(sess)
key := "upload.csv.gzip"
result, err := uploader.Upload(&s3manager.UploadInput{
Body: pr,
Bucket: aws.String("s3-select-sampleyy"),
Key: aws.String(key),
})
if err != nil {
exitErrorf("failed to upload %s\n, %v", key, err)
}
fmt.Printf("Successfully uploaded %s to %s\n", key, result.Location)
}
まとめ
Amazon S3 Selectは、SQLでデータの一部のみを高速に抽出できるという特徴から、従来はGet ObjectでS3からファイルをダウンロード&データ抽出していた処理の置き換えや、データ分析のETL処理での利用等が期待されます。
冒頭でご紹介したAWS Developer Blogには各AWS SDKでのコード例が掲載されていますので、皆さんもぜひお好みの言語でAmazon S3 Selectを試してみたください。