[Rust] try_unfoldで複数ページに分割されたレスポンスを非同期ストリームにする

futures-rsのtry_unfoldを使って、AWS APIのレスポンスを非同期ストリームにしてみました。今回はCloudWatch LogsのDescribeLogGroups APIで動かしてみましたが、他のAPIでも同様に非同期ストリーム化できると思います。
2020.07.07

検索や一覧取得のAPIを実行すると、結果を複数ページに分割して受け取ることがあります。DynamoDBの scanquery とか、CloudWatch Logsの describe_* とか、AWSのAPIでもよく見るやつですね。

futures-rsstream::try_unfold を使って、こういったページ単位で分割されたデータを非同期ストリームとして読む方法を試してみました。

検証環境

  • macOS : 10.14.6
  • Rust : 1.44.1 (c7087fe00 2020-06-17)

非同期ストリームの作り方

色んなやり方がありそうですが、今回は futures::stream::try_unfold を使って CloudWatch Logsのロググループを列挙してみます。エラー情報が不要な場合は futures::stream::unfold を使うと良さそうです。

他にも async-streamtry_stream! を使う方法や、 futures-rsfutures::channel::mpsc::channel を使うなど、いろいろな方法があるようです。OPTiM社のTech blogで非常に詳しく解説されておりますので、未読の方はぜひ。

Rustの非同期プログラミングをマスターする - OPTiM TECH BLOG

依存ライブラリ

Cargo.toml の内容です

[dependencies]
futures = "0.3"
pin-utils = "0.1"
rusoto_core = "0.44"
rusoto_logs = "0.44"
tokio = "0.2"

試してみる

私のAWS環境にはロググループが24個あります。DescribeLogGroups1回で5件ずつ取得してみます。5ページに分割されて、各5件・最終ページのみ4件になればOKです。

$ aws logs describe-log-groups | jq .logGroups | jq length
24

手続き的に処理する場合

ロググループの列挙を手続き的に処理する場合はこのように書けると思います。

use rusoto_core::{Region, RusotoError};
use rusoto_logs::{
    CloudWatchLogs, CloudWatchLogsClient, DescribeLogGroupsError, DescribeLogGroupsRequest,
    DescribeLogGroupsResponse,
};

async fn print_log_groups_procedure(
    client: CloudWatchLogsClient,
) -> Result<(), Box<dyn std::error::Error>> {
    let mut request = DescribeLogGroupsRequest {
        limit: Some(5),
        ..Default::default()
    };

    // ロググループを列挙するための変数
    let mut has_next = true;
    let mut n_iter = 0;
    while has_next {
        n_iter += 1;
        println!("{}回目", n_iter);

        // ロググループの取得
        let response = client.describe_log_groups(request.clone()).await?;

        // ロググループを使った処理
        for lg in response.log_groups.unwrap_or_else(Vec::new) {
            println!("name:{:?}, size:{:?}", lg.log_group_name, lg.stored_bytes);
        }

        // 次のページに進むための処理
        request.next_token = response.next_token;
        has_next = request.next_token.is_some();
    }

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = CloudWatchLogsClient::new(Region::default());
    print_log_groups_procedure(client).await?;

    Ok(())
}

これを実行するとこんな感じになります。(環境により結果が異なります。)

$ cargo run
1回目
name:Some("/aws/apigateway/xxxxxxx"), size:Some(109)
name:Some("/aws/lambda/xxxxx-xxxxxxx"), size:Some(5153)
name:Some("/aws/lambda/xxxxxxxxxxxxxx"), size:Some(590)
name:Some("/aws/lambda/xxxxxxxx"), size:Some(35103)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxxxxx-xxxxxxxx"), size:Some(472)
2回目
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxxxx"), size:Some(13384)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxx"), size:Some(51610)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxxxxx"), size:Some(35034)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxxxxxx"), size:Some(579)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxx"), size:Some(96615)
3回目
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxxxx-xxxxxxx"), size:Some(2262)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxxxx"), size:Some(70913)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxxxxxx-xxxxxxxxx"), size:Some(20543938)
name:Some("/aws/lambda/xxxxxx-xxxxxxx-xxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-xxxxxxxxxxxx"), size:Some(111945)
name:Some("/aws/lambda/xxxxx-xxxxxxxxx-xxx-xx-xxxxxxxx-xxxxxxx-xxxxxxxx"), size:Some(3058)
4回目
name:Some("/aws/lambda/xxxxx-xxxxxxxxx-xxx-xx-xxxxxx-xxxxxxx-xxxxxxxx"), size:Some(3894)
name:Some("/aws/lambda/xxxxx-xxxxxxxxx-xxx-xxxxx-xxxxxx-xxxxx-xxxxxxxx"), size:Some(47351)
name:Some("/aws/lambda/xxxxxxx-xxxx"), size:Some(1174)
name:Some("/aws/lambda/xx-xxxxxx-xxx"), size:Some(10040)
name:Some("/aws/lambda/xx-xxxxxx-xxx-xxxxxx-xxxxxx"), size:Some(1021)
5回目
name:Some("/aws/lambda/xxxxxx-xxx"), size:Some(710)
name:Some("/aws/states/xxxxx-xxxxxxxxx-xxxxx-xxxx"), size:Some(6344)
name:Some("xxxxxxxxxxxx"), size:Some(0)
name:Some("xxxxxxxxxxxx"), size:Some(0)

ロググループの列挙方法と、それを扱って処理する際の知識が混在していて少し読みづらくなると思います。今回のロググループのように件数が少ない場合は Vec<DescribeLogGroupsResponse> に突っ込んで処理を分割しても良いんですが、ログイベントやDynamoDBのアイテムの場合はそうもいきません。そこで非同期ストリームを使います。

非同期ストリームにする場合

ロググループの列挙は非同期ストリームにまかせてみましょう。

use futures::{Stream, StreamExt};       // 追加
use rusoto_core::{Region, RusotoError};
use rusoto_logs::{
    CloudWatchLogs, CloudWatchLogsClient, DescribeLogGroupsError, DescribeLogGroupsRequest,
    DescribeLogGroupsResponse,
};

// ロググループの列挙に関する情報のみ取り扱う
pub fn log_groups_stream(
    client: CloudWatchLogsClient,
) -> impl Stream<Item = Result<DescribeLogGroupsResponse, RusotoError<DescribeLogGroupsError>>> {
    enum State {
        Next(CloudWatchLogsClient, DescribeLogGroupsRequest, i32),
        Complete,
    }

    let request = DescribeLogGroupsRequest {
        limit: Some(5),
        ..Default::default()
    };

    futures::stream::try_unfold(State::Next(client, request, 1), |state| async {
        match state {
            State::Next(client, mut request, n_iter) => {
                println!("{}回目", n_iter);

                // ロググループの取得
                let response = client.describe_log_groups(request.clone()).await?;

                // 次に読むべき情報があるか判定する
                let next_state = if response.next_token.is_some() {
                    // next_tokenがある場合、次のループで続きを読ませる
                    request.next_token = response.next_token.clone();
                    State::Next(client, request, n_iter + 1)
                } else {
                    // next_tokenがない場合は次のループでストリームを終了させる
                    State::Complete
                };

                // レスポンスと状態値を返す (タプル形式で)
                Ok(Some((response, next_state)))
            }
            State::Complete => Ok(None), // ストリームが終端に達しているため返すデータなし
        }
    })
}

// ロググループを処理するが、列挙方法はストリームにまかせる
async fn print_log_groups_stream(
    client: CloudWatchLogsClient,
) -> Result<(), Box<dyn std::error::Error>> {
    // ログ列挙用の非同期ストリームを取得
    let groups = log_groups_stream(client);

    // while let文で非同期ストリームを扱う場合はピン必須
    pin_utils::pin_mut!(groups);

    // ロググループを列挙しながら処理する (列挙の詳細はストリームが隠蔽してる)
    while let Some(r) = groups.next().await {
        // Result => OKの場合だけ処理する
        let response = r?;

        // ロググループを使った処理
        for lg in response.log_groups.unwrap_or_else(Vec::new) {
            println!("name:{:?}, size:{:?}", lg.log_group_name, lg.stored_bytes);
        }
    }

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = CloudWatchLogsClient::new(Region::default());
    // print_log_groups_procedure(client).await?;
    print_log_groups_stream(client).await?;

    Ok(())
}

コードの総量は増えましたが、ログループの列挙・使う部分を分割することができました。同じ動作になるか動かしてみましょう。

$ cargo run
1回目
name:Some("/aws/apigateway/xxxxxxx"), size:Some(109)
name:Some("/aws/lambda/xxxxx-xxxxxxx"), size:Some(5153)
name:Some("/aws/lambda/xxxxxxxxxxxxxx"), size:Some(590)
name:Some("/aws/lambda/xxxxxxxx"), size:Some(35103)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxxxxx-xxxxxxxx"), size:Some(472)
2回目
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxxxx"), size:Some(13384)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxx"), size:Some(51610)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxxxxx"), size:Some(35034)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxxxxxx"), size:Some(579)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxx"), size:Some(96615)
3回目
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxxxx-xxxxxxx"), size:Some(2262)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxx-xxxxxxxxx"), size:Some(70913)
name:Some("/aws/lambda/xxxxxx-xxx-xxx-xxxxxxxxx-xxxxxxxxx"), size:Some(20543938)
name:Some("/aws/lambda/xxxxxx-xxxxxxx-xxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-xxxxxxxxxxxx"), size:Some(111945)
name:Some("/aws/lambda/xxxxx-xxxxxxxxx-xxx-xx-xxxxxxxx-xxxxxxx-xxxxxxxx"), size:Some(3058)
4回目
name:Some("/aws/lambda/xxxxx-xxxxxxxxx-xxx-xx-xxxxxx-xxxxxxx-xxxxxxxx"), size:Some(3894)
name:Some("/aws/lambda/xxxxx-xxxxxxxxx-xxx-xxxxx-xxxxxx-xxxxx-xxxxxxxx"), size:Some(47351)
name:Some("/aws/lambda/xxxxxxx-xxxx"), size:Some(1174)
name:Some("/aws/lambda/xx-xxxxxx-xxx"), size:Some(10040)
name:Some("/aws/lambda/xx-xxxxxx-xxx-xxxxxx-xxxxxx"), size:Some(1021)
5回目
name:Some("/aws/lambda/xxxxxx-xxx"), size:Some(710)
name:Some("/aws/states/xxxxx-xxxxxxxxx-xxxxx-xxxx"), size:Some(6344)
name:Some("xxxxxxxxxxxx"), size:Some(0)
name:Some("xxxxxxxxxxxx"), size:Some(0)

ちゃんと同じ結果になりましたね!

try_unfoldについて補足

futures-rsのドキュメント の説明がわかりやすいと思います。第1引数に初期状態を指定し、第2引数に実際のストリーム処理を行うクロージャを渡します。以下ドキュメントのサンプルコードの抜粋です。

use futures::stream::{self, TryStreamExt};

let stream = stream::try_unfold(0, |state| async move {
    if state < 0 {
        return Err(SomeError);
    }

    if state <= 2 {
        let next_state = state + 1;
        let yielded = state * 2;
        Ok(Some((yielded, next_state)))
    } else {
        Ok(None)
    }
});

let result: Result<Vec<i32>, _> = stream.try_collect().await;
assert_eq!(result, Ok(vec![0, 2, 4]));

try_unfold の場合は成否を Result で返すため、成功した場合は Ok に列挙値を、失敗した場合は Err に失敗要因を指定します。処理が成功した場合はデータの有無によって返す値が異なり、 データがある場合は Some(列挙値, 状態値) を返し、ストリームの終端に達した場合は None を返します。

ドキュメントの例の場合は以下のようになります。

  • state < 0
    • 異常値で処理が失敗
  • 0 ≤ state ≤ 2
    • 処理成功・データあり
  • state ≥ 3
    • 処理成功・データなし

終わりに

Rustの非同期ストリームを試してみましたが、コンパイル通すだけでも結構大変でした。 ちゃんと理解していない事が多いなと感じており、 async-book も読み始めました。 また書いてる量も圧倒的に足りていないので、もっと書いてすんなり動かせるように頑張ります!

参考