この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
最近「Go言語による並行処理」を読んでGoルーティンとチャネルを組み合わせたプラグラムを試してみたかったので練習がてらS3へのストリーミングアップロードを行うサンプルを書いてみました。
やること
このプログラムでやりたいことは以下の通りです。
- データソースからデータを読み込んでinputチャネルに送信する
- inputチャネルからデータを読み込んでS3へストリーミングアップロードする
- アップロード先のキーはファイルが特定のサイズになるごとに切り替える(チャンク化する)
作戦
チャンク化を以下のように行います。
- ストリーミングアップロードを行うためにio.Pipeを使ってinputチャネルから読んだデータをSDKのmanager.Uploaderにフィードする(io.Readerを渡せる)
- Uploaderにフィードしたデータ件数を数えておいて一定数に達したらパイプをクローズしアップロードを完了する
- inputチャネルがクローズされるまで上記を繰り返し行う
コード
package infra
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
"go.uber.org/zap"
"io"
"sync"
)
type Uploader struct {
Bucket string
Prefix string
logger *zap.Logger
wg *sync.WaitGroup
}
func NewUpLoader(bucket string, prefix string, logger *zap.Logger, wg *sync.WaitGroup) Uploader {
return Uploader{
Bucket: bucket,
Prefix: prefix,
logger: logger,
wg: wg,
}
}
// ChunkedUpload チャンクサイズを指定してinputの内容をS3へアップロードする.
func (up *Uploader) ChunkedUpload(ctx context.Context, size int, input chan []byte) {
go up.upload(ctx, input, size)
}
func (up *Uploader) upload(ctx context.Context, input chan []byte, size int) {
up.wg.Add(1)
defer up.wg.Done()
for {
//チャンネルクローズ検出用のcontext
inputChannelContext, cancel := context.WithCancel(ctx)
func(cancel context.CancelFunc) {
r, w := io.Pipe()
defer w.Close()
up.startS3Upload(r)
var cnt = 0
for {
select {
case <-ctx.Done():
return
case in, ok := <-input:
if ok {
//チャンクサイズをインクリメント
cnt++
w.Write(in)
} else {
//クローズされたらコンテキストに通知
cancel()
}
//チャンクがいっぱいになったらクローズする
if cnt == size || !ok {
return
}
}
}
}(cancel)
//呼び出し側からのキャンセル or チャネルクローズで終了
select {
case <-ctx.Done():
return
case <-inputChannelContext.Done():
return
default:
}
}
}
func (up *Uploader) startS3Upload(r *io.PipeReader) {
up.wg.Add(1)
go func() {
defer up.wg.Done()
key := up.Prefix + uuid.NewString()
ctx := context.Background()
req := s3.PutObjectInput{
Bucket: &up.Bucket,
Key: &key,
Body: r,
}
logger := up.logger.With(zap.String("key", key))
logger.Debug("start S3 uploading")
if _, err := newS3Uploader(ctx).Upload(ctx, &req); err == nil {
logger.Info("upload done")
} else {
logger.Error(err.Error())
}
}()
}
func newS3Uploader(ctx context.Context) *manager.Uploader {
// localstackを使うためにエンドポイント設定を上書き
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: "http://localhost:4567",
HostnameImmutable: true,
}, nil
})
cfg, _ := config.LoadDefaultConfig(ctx, config.WithEndpointResolverWithOptions(resolver))
client := s3.NewFromConfig(cfg)
upload := manager.NewUploader(client)
return upload
}
実行してみる
これに次のようなデータソースを接続して試してみます。
package infra
import (
"context"
"fmt"
"go.uber.org/zap"
"sync"
"time"
)
type Source struct {
logger *zap.Logger
wg *sync.WaitGroup
}
func NewSource(logger *zap.Logger, wg *sync.WaitGroup) Source {
return Source{
logger: logger,
wg: wg,
}
}
func (source *Source) Feed(ctx context.Context, cnt int) chan []byte {
output := make(chan []byte)
source.wg.Add(1)
go func() {
defer close(output)
defer source.wg.Done()
for i := 0; i < cnt; i++ {
select {
case <-ctx.Done():
return
case output <- []byte(fmt.Sprintf("%d\n", i)):
}
time.Sleep(time.Millisecond * 5)
}
}()
return output
}
適当なサイズのデータを入力して実行してみます。
func main() {
ctx, cancel := context.WithCancel(context.Background())
logger, _ := zap.NewDevelopment()
wg := sync.WaitGroup{}
defer cancel()
upload := infra.NewUpLoader(
"bucket",
"prefix-",
logger,
&wg,
)
source := infra.NewSource(logger, &wg)
in := source.Feed(ctx, 5000)
upload.ChunkedUpload(ctx, 1000, in)
wg.Wait()
}
出力はこんな感じです。
2022-02-22T15:46:00.259+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-13fea82d-bcb0-4fab-83be-d9a67576560e"}
2022-02-22T15:46:06.202+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-db0e6579-df9e-4f3d-8e10-177d3a02a71a"}
2022-02-22T15:46:11.220+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-13fea82d-bcb0-4fab-83be-d9a67576560e"}
2022-02-22T15:46:12.114+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-a8c17f0f-04d8-4fdf-91f9-1d8c5f6ee7ac"}
2022-02-22T15:46:17.144+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-db0e6579-df9e-4f3d-8e10-177d3a02a71a"}
2022-02-22T15:46:18.121+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-c607370b-947d-47a6-8067-7b9a36333e1f"}
2022-02-22T15:46:23.138+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-a8c17f0f-04d8-4fdf-91f9-1d8c5f6ee7ac"}
2022-02-22T15:46:24.105+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-b7fc80e6-56d0-49e5-b52f-554399de83ec"}
2022-02-22T15:46:29.130+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-c607370b-947d-47a6-8067-7b9a36333e1f"}
2022-02-22T15:46:30.099+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-6f2bfd8c-cddb-4667-84af-a8e2df3999a7"}
2022-02-22T15:46:35.143+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-b7fc80e6-56d0-49e5-b52f-554399de83ec"}
2022-02-22T15:46:35.144+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-6f2bfd8c-cddb-4667-84af-a8e2df3999a7"}
実際にS3上に作成されるファイルは以下のようになります。サイズが異なるのはデータとして連番の数値を使っていてその桁数が異なるためです。(チャンク数とファイル数が空ファイルの設定次第で空ファイルができてしまうのも気になります)
awslocal --endpoint-url=http://localhost:4567 s3 ls s3://bucket/
2022-02-22 15:46:11 3890 prefix-13fea82d-bcb0-4fab-83be-d9a67576560e
2022-02-22 15:46:35 0 prefix-6f2bfd8c-cddb-4667-84af-a8e2df3999a7
2022-02-22 15:46:23 5000 prefix-a8c17f0f-04d8-4fdf-91f9-1d8c5f6ee7ac
2022-02-22 15:46:35 5000 prefix-b7fc80e6-56d0-49e5-b52f-554399de83ec
2022-02-22 15:46:29 5000 prefix-c607370b-947d-47a6-8067-7b9a36333e1f
2022-02-22 15:46:17 5000 prefix-db0e6579-df9e-4f3d-8e10-177d3a02a71a
まとめ
今更ですが日々の暮らしのちょっとした平行処理にGoルーティンは便利だなと思いました。