この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
福岡オフィスのyoshihitohです。 この記事はAWS Lambda Custom Runtimes芸人 Advent Calendar 2018の 12日目 です。
はじめに
先日のre:Invent 2018で Custom Runtimes が発表されて、RustでLambda Functionを実装できるようになりました。
今回は下記の記事で紹介した Rusoto を使ってAWSのサービスにアクセスしてみます!
検証環境
- macOS: 10.13.6
- Rust: 1.31.0
- musl-cross: stable 0.9.7
試してみること
S3にファイルがアップロードされたら問答無用でGZIP圧縮を強制してみます。Lambda Functionの処理対象はS3のイベント設定で Prefix
および Suffix
を指定して絞り込みます。
Lambda Functionで実装する内容は以下の通りです。
- RusotoをAWS Lambda向けにビルドできるようにする
- S3のイベントに対応する構造体を実装する
- S3からデータをダウンロードする
- ダウンロードしたデータをGZIP圧縮する
- GZIP圧縮したデータをアップロードする
- S3のイベントを発火したオブジェクトを削除する
Rusotoを AWS Lambda向けにビルドする
今回は上記の記事と同様に musl-cross を利用してAWS Lambda環境向けにクロスビルドします。
Rusotoをデフォルトの設定でビルドすると、 openssl に依存しているためビルドエラーになります。
$ cargo build --release --target x86_64-unknown-linux-musl
Compiling pkg-config v0.3.14
Compiling openssl v0.10.15
Compiling foreign-types-shared v0.1.1
Compiling native-tls v0.2.2
Compiling openssl-probe v0.1.2
Compiling foreign-types v0.3.2
Compiling rusoto_core v0.36.0
Compiling openssl-sys v0.9.39
error: failed to run custom build command for `openssl-sys v0.9.39`
process didn't exit successfully: `/Users/yoshihitoh/workspace/projects/blog/serverless/custom-runtime-rust/compress-csv/target/release/build/openssl-sys-48e447cac359b72b/build-script-main` (exit code: 101)
これは困った。。と思ったら MediumにKonstantinさんによる素晴らしい記事 があがっており、Rusotoの設定を変更することで対処できることがわかりました。
rustls というRust実装のTLSライブラリを使用することでクロスビルドできるようになります。Cargo.tomlの [dependencies]
セクションを以下のように変更します。
Cargo.toml
[dependencies]
chrono = { version = "0.4", features = ["serde"] }
flate2 = "1.0"
futures = "0.1"
lambda_runtime = "0.1"
log = "^0.4"
rusoto_core = { version = "0.36.0", default_features = false, features = ["rustls"] }
rusoto_s3 = { version = "0.36.0", default_features = false, features = ["rustls"] }
serde = "^1"
serde_derive = "^1"
simple_logger = "^1"
default_features
を無効化することに加えて、 features
で rustls を使用するように設定します。これでクロスビルドできるようになります。
$ cargo build --release --target x86_64-unknown-linux-musl
Compiling semver-parser v0.7.0
Compiling libc v0.2.45
Compiling void v1.0.2
... 略
Finished release [optimized] target(s) in 6.98s
イベントを受け取る
S3のイベントを受け取れるように構造体を用意します。Lambda Functionテスト用のS3 PUTのイベントを参考に構造体を定義します。
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "ap-northeast-1",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "example-bucket",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::example-bucket"
},
"object": {
"key": "test/key",
"size": 1024,
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
}
serde を使って構造体をシリアライズ/デシリアライズできるようにしていきます。今回は簡略化するため必要最低限の項目だけを対象にしますが、他の項目も同様の手順で対応できると思います。
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Bucket {
name: String,
arn: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Object {
key: String,
size: usize,
#[serde(rename = "eTag")]
etag: String,
sequencer: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct S3Data {
bucket: Bucket,
object: Object,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Record {
event_source: String,
aws_region: String,
event_time: DateTime<Utc>,
event_name: String,
s3: S3Data,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct S3PutEvent {
#[serde(rename = "Records")]
records: Vec<Record>,
}
Rustのフィールド名はスネークケースを使うことが多いと思いますが、 Record
のフィールド名はキャメルケースです。そこで #[serde(rename_all = "camelCase")]
の指定を追加してキャメルケースに対応させます。
参考
残りの部分を実装
ハンドラ関数のメイン制御です。
#[derive(Serialize, Deserialize)]
struct S3PutResponse {
compressed: Vec<String>,
ignored: Vec<String>,
}
enum CompressError {
Ignored(String),
}
type CompressResult<T> = Result<T, CompressError>;
fn default_profile_client(region: Region) -> S3Client {
S3Client::new(region)
}
fn my_handler(event: S3PutEvent, _ctx: Context) -> Result<S3PutResponse, HandlerError> {
let s3 = default_profile_client(Region::ApNortheast1);
let (compressed, ignored): (Vec<CompressResult<String>>, Vec<CompressResult<String>>) = event.records.iter().map(|r| {
compress_csv(&s3, r)
.and_then(|bytes| { upload_gzip(&s3, r, bytes) })
.and_then(|_| delete_csv(&s3, r))
}).partition(|r| {
r.is_ok()
});
let compressed = compressed.iter().map(|r| r.as_ref().ok().unwrap().to_string()).collect();
let ignored = ignored.iter().map(|r| {
match r.as_ref().err().unwrap() {
CompressError::Ignored(uri) => uri.to_string(),
_ => String::new(),
}
}).collect();
Ok(S3PutResponse { compressed, ignored, })
}
試したいことに書いた手順の通りに制御しています。次にダウンロードと圧縮処理を実装します。
fn compress_csv(s3: &S3, record: &Record) -> CompressResult<Vec<u8>> {
let get_req = GetObjectRequest{
bucket: record.s3.bucket.name.clone(),
key: record.s3.object.key.clone(),
..Default::default()
};
let get_res = s3.get_object(get_req).sync();
if let Ok(get_out) = get_res {
// S3のデータを読み込みながらGZIP圧縮する
let mut enc = GzEncoder::new(Vec::new(), Compression::default());
for chunk in get_out.body.unwrap().take(512 * 1024).wait() {
enc.write_all(chunk.unwrap().as_mut_slice()).unwrap();
}
match enc.finish() {
Ok(v) => Ok(v),
Err(_) => Err(CompressError::Ignored(record.s3_uri())),
}
}
else {
Err(CompressError::Ignored(record.s3_uri()))
}
}
S3からオブジェクトをダウンロードしながらGZIP圧縮しています。 body
データの取得部分がよくわからなかったのですが、 futures::stream::Stream からデータを取り出す実装を書けば良さそうです。 s4 という、RustからS3を手軽に扱うためのライブラリの実装を参考に、 .take()
と .wait()
を使って実装しました。
また、今回は簡易実装のため同期版のAPIを使用しています。実戦投入する場合は非同期APIで実行した方が効率よくリソースを使用できてよさそうです。
ここまででGZIP圧縮したデータが準備できました。GZIP圧縮したデータをS3にアップロードします。
fn upload_gzip(s3: &S3, record: &Record, gz_bytes: Vec<u8>) -> CompressResult<()> {
let put_req = PutObjectRequest {
bucket: record.s3.bucket.name.clone(),
key: format!("{}.gz", &record.s3.object.key),
body: Some(ByteStream::from(gz_bytes)),
..Default::default()
};
s3.put_object(put_req).sync()
.map(|_| ())
.map_err(|_| CompressError::Ignored(record.s3_uri()))
}
最後にイベントを発火したオブジェクトを削除します。
fn delete_csv(s3: &S3, record: &Record) -> CompressResult<String> {
let del_req = DeleteObjectRequest {
bucket: record.s3.bucket.name.clone(),
key: record.s3.object.key.clone(),
..Default::default()
};
s3.delete_object(del_req).sync()
.map(|_| record.s3_uri())
.map_err(|_| CompressError::Ignored(record.s3_uri()))
}
ここまでで実装完了です。実際に動かしてみましょう
動作確認
アプリケーションのアップロード方法は他の記事と同様のため省略します。詳細は下記の記事を確認ください。
S3のイベントトリガーを作成します。
対象のバケット、 Prefix
および Suffix
を指定します。
ファイルをアップロードします。
少ししてから再読込します。
ちゃんとGZIP圧縮されていますね!
おわりに
今回はAWS LambdaのCustom Runtimesを使って、RustからAWSのサービス(S3)を操作してみました。
Rust実装のライブラリが増えてきたおかげでクロスビルドしたアプリケーションでも色々なことができるようになってきています。Rustの特性を活かして起動の速さ・実行速度が求められる処理がでてきたら実践投入してみたいと思います!