AWS CLIを利用してRustでS3 Selectを実行する

こんにちは。サービスグループの武田です。RusotoでS3 Selectが未実装だったのでAWS CLIを利用して実行してみました。
2021.02.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。サービスグループの武田です。

最近はRustを書いています。難しい面もありますが、コンパイル時点でバグを弾けるのは強力ですね。

RustでAWSのサービスを使おうとすると、Rusotoを使用するのがデファクトではないでしょうか。非公式ながらSDKとしてしっかり使えます。さてRusotoを使用してプログラムを書いていたのですが、S3 Selectを実行しようとしたところ、これができませんでした。

Issueは上がっており、これがマージされれば使用可能になると思われますが、執筆時点では諦めざるを得ません。どうにかできないか考えたところ、AWS CLIを外部コマンドとして呼べばいけるんじゃね?ということで試してみました。

なお、ソースコードはGitHubに上がっています。

TAKEDA-Takashi/rust-call-aws-cli

検証環境

$ aws --version
aws-cli/2.1.10 Python/3.7.4 Darwin/19.6.0 exe/x86_64 prompt/off

$ rustc -V
rustc 1.50.0 (cb75ad5db 2021-02-10)

$ cargo -V
cargo 1.50.0 (f04e7fab7 2021-02-04)

$ sw_vers
ProductName:	Mac OS X
ProductVersion:	10.15.7
BuildVersion:	19H15

やってみた

まずはRustのプロジェクトを作成します。適当なディレクトリに移動してコマンドを実行しましょう。

$ cargo new rust-call-aws-cli

プロジェクトが作成できたら、必要な依存関係を書いておきます(dependencies以外は省略)。

Cargo.toml

[dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

まずはシンプルに、RustからAWS CLIを実行するプログラムを書いてみます。

main.rs

use std::error::Error;
use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let output = Command::new("aws")
        .args(&["sts", "get-caller-identity"])
        .output()
        .await?;
    println!("{:?}", output);

    Ok(())
}

Rustでの外部コマンド実行はstd::process::Commandが標準ライブラリに用意されていますが、Tokioが便利なので今回はこちらを使います。実行しているのはお馴染みのget-caller-identityですね。

実行結果はこちらです。

Output { status: ExitStatus(ExitStatus(0)), stdout: "{\n    \"UserId\": \"AROAIXEB753ZQNXCYORM2:botocore-session-1614134172\",\n    \"Account\": \"123456789012\",\n    \"Arn\": \"arn:aws:sts::123456789012:assumed-role/cm-takeda.takashi/botocore-session-1614134172\"\n}\n", stderr: "" }

どうやら問題なく実行できています。ちなみにdefaultプロファイルを使用していますので、それ以外を使う場合は明示的に指定してあげましょう。

それでは続いてS3 Selectを試していきましょう。まずは次のようなJSON Lines形式のファイルを用意し、S3バケットにアップロードします。バケットは任意で用意してください。ここではtestdata-xxxxバケットにアップロードしたものとします。

test_data.json

{"name": "Test", "code": 1939, "tags": "Dev", "lang": "ja"}
{"name": "IT Division", "code": 1, "tags": "Prod", "lang": "ja"}
{"name": "Sample", "code": 31, "lang": "en"}
{"name": "Classmethod", "code": 2, "lang": "en"}
{"name": "Classmethod2", "code": 19}

続いて、先ほど書いたプログラムを次のように修正します。

diff --git a/src/main.rs b/src/main.rs
index daec329..3eed65b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,10 +4,26 @@ use tokio::process::Command;
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn Error>> {
     let output = Command::new("aws")
-        .args(&["sts", "get-caller-identity"])
+        .args(&[
+            "s3api",
+            "select-object-content",
+            "--bucket=testdata-xxxx",
+            "--key=test_data.json",
+            "--input-serialization",
+            r#"{"JSON":{"Type":"LINES"}}"#,
+            "--output-serialization",
+            r#"{"JSON":{"RecordDelimiter":"\n"}}"#,
+            "--expression",
+            "SELECT * FROM s3object s LIMIT 5",
+            "--expression-type=SQL",
+            "output.json",
+        ])
         .output()
         .await?;
     println!("{:?}", output);

+    let contents = tokio::fs::read("output.json").await?;
+    println!("{:?}", String::from_utf8(contents));
+
     Ok(())
 }

stsからs3apiに呼び出すコマンドが変わっています。またselect-object-contentは結果をファイルに出力するため、一度output.jsonに出力した後、そのファイルを読み込んでいます。実行してみましょう。

Output { status: ExitStatus(ExitStatus(0)), stdout: "", stderr: "" }
Ok("{\"name\":\"Test\",\"code\":1939,\"tags\":\"Dev\",\"lang\":\"ja\"}\n{\"name\":\"IT Division\",\"code\":1,\"tags\":\"Prod\",\"lang\":\"ja\"}\n{\"name\":\"Sample\",\"code\":31,\"lang\":\"en\"}\n{\"name\":\"Classmethod\",\"code\":2,\"lang\":\"en\"}\n{\"name\":\"Classmethod2\",\"code\":19}\n")

なんだかいい感じに取得できていますね!ただこのままだとプログラムからは扱いにくいので、本線ではありませんがデシリアライズもやってみましょう。次のように修正していきます。

diff --git a/src/main.rs b/src/main.rs
index 3eed65b..8026c9f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,15 @@
+use serde::Deserialize;
 use std::error::Error;
 use tokio::process::Command;

+#[derive(Debug, Deserialize)]
+struct TestData {
+    name: String,
+    code: u32,
+    tags: Option<String>,
+    lang: Option<String>,
+}
+
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn Error>> {
     let output = Command::new("aws")
@@ -20,10 +29,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
         ])
         .output()
         .await?;
-    println!("{:?}", output);
+
+    if Some(0) != output.status.code() {
+        panic!("{:?}", output);
+    }

     let contents = tokio::fs::read("output.json").await?;
-    println!("{:?}", String::from_utf8(contents));
+    for line in String::from_utf8(contents)?.lines() {
+        let d: TestData = serde_json::from_str(line)?;
+        println!("{:?}", d);
+    }

     Ok(())
 }

JSONのデシリアライズはserde_jsonを使用します。構造体を定義していくつか設定をするだけです。簡単ですね。JSON Linesは1行1データですので、改行コードでデータを分割し、それぞれをデシリアライズします。

さて実行結果は次のようになります。

TestData { name: "Test", code: 1939, tags: Some("Dev"), lang: Some("ja") }
TestData { name: "IT Division", code: 1, tags: Some("Prod"), lang: Some("ja") }
TestData { name: "Sample", code: 31, tags: None, lang: Some("en") }
TestData { name: "Classmethod", code: 2, tags: None, lang: Some("en") }
TestData { name: "Classmethod2", code: 19, tags: None, lang: None }

完璧ですね!

まとめ

RusotoでS3 Selectできるようになるまでのつなぎですが、やりたいことはできました。未実装のAPIがあるとは思っていなかったので、少し焦りました。みんなもRust書いていこう。