[初心者向け] Kinesis Data Streams に Lambda でデータを流す様子を理解するために図を書いた

2023.11.08

コーヒーが好きな emi です。

Kinesis Data Streams にデータを流し込むときのデータの動きや構造を理解するために絵を描いてみました。
また、Lambda から Kinesis Data Streams に実際にデータを流し込み、挙動とデータの順番を見てみました。

Kinesis Data Streams の内部の仕組みと用語解説

見覚えのある方も多いかもしれませんが、Kinesis Data Streams のアーキテクチャは以下のようになっています。

  • プロデューサー
    • プロデューサーと呼ばれる各種アプリケーションや AWS サービスから Kinesis Data Streams にデータを流し込みます。
  • コンシューマー
    • Kinesis Data Streams からレコードを取得して処理します。
  • シャード
    • 流れてくる一連のデータレコードを保持します。データが乗っかるベルトコンベアのようなものとイメージいただけると良いです。1 つの Kinesis Data Streams の中には複数のシャードが存在します。シャードが増えれば増えるほど、一度に流せるデータの量が増えます。Kinesis Data Streams のスケーリングの単位でもあります。
  • データレコード
    • プロデューサーから流れてくるデータの単位です。

データレコードは以下のように「シーケンス番号」「パーティションキー」「データ BLOB」から成り立っています。

  • シーケンス番号(「シーケンス」には「連続」「順序」などの意味があります)
    • 各データレコードに一意に割り当てられる識別子です。同じシャード内のレコードを順番に並べるために使用されます。
    • 新しいレコードが追加されるたびに、そのシーケンス番号は前のレコードのシーケンス番号よりも大きくなります。これにより、レコードの追加順序が保持されます。
    • 書き込みリクエスト間の時間が長くなるほど、シーケンス番号は大きくなります。
  • パーティションキー
    • パーティションキーは、Kinesis Data Streams がデータレコードを特定のシャードにルーティングするために使用する文字列です。
    • パーティションキーが同じ場合、データは同じシャードに割り当てられます。

パーティションキーがすべて一律なら、全てのデータが同じシャードにルーティングされます。こうすると、ストリーム内の複数のシャードを活用して並行・リアルタイム処理を行うという Kinesis Data Streams の主要な利点を活かせない場合があります。そのため、適切にデータをロードバランシングするためには、パーティションキーを適切に選択することが重要です。

  • データ BLOB
    • ストリームに追加する具体的なデータです。
    • BLOB(Binary Large OBject)は大量のバイナリデータを指す一般的な用語で、Kinesis Data Streams で保持できるデータ BLOB の大きさは最大 1 MB です。
    • データ BLOB は任意のバイナリデータ(テキスト、画像、音声など)を含むことができます。

検証

実際に動かしてみながら挙動を確認しましょう。
検証は東京リージョンでおこないます。

構成図

今回は Lambda から Kinesis Data Streams にデータを流し込んでみます。

Kinesis Data Streams の作成

Kinesis コンソールにアクセスし、「今すぐ始める」で「Kinesis Data Streams」をチェックして「データストリームを作成」をクリックします。

  • データストリーム名:任意のデータストリーム名(今回は「Kinesis_Kinesis_Lambda_test」としています)
  • 容量モード:今回はオンデマンドを選択
    • オンデマンド:データストリームの容量(シャード数)が自動スケールします
    • プロビジョンド:データストリームの容量(シャード数)を固定します

「データストリームの作成」をクリックします。

「新しいデータストリームの作成には最大 10 分かかることがあります」と表示されていますが、今回は 10 秒ほどで作成が完了しました。

作成した Kinesis Data Streams の ARN をコピーして控えておいてください。Lambda 関数の IAM ロールに付与する IAM ポリシーの作成時に使用します。

Lambda の作成

Lambda コンソールを開き、[関数] - [関数の作成] をクリックします。

  • 一から作成
  • 関数名:今回は「InLambda_Kinesis_Lambda_test」という名前の関数を作成しました。
  • ランタイム:Python 3.11
  • アーキテクチャ:x86_64

「関数の作成」をクリックします。

Lambda に権限を付与する

AWS マネジメントコンソールから Lambda 関数を作成すると、デフォルトで CloudWatch Logs にログを書き込むアクセス許可を持つ実行ロールが自動作成され付与されます。
このデフォルトの IAM ロールに、Kinesis Data Streams にデータを Put するための権限を追加しましょう。

・ポリシーを作成

IAM コンソールより [ポリシー] - [ポリシーの作成] で、以下のようなカスタマー管理ポリシーを作成してください。
IAM ポリシーの名前は「Lambda_Kinesis_PutRecordPermission」としました。

Lambda_Kinesis_PutRecordPermission

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "PutRecordPermission",
            "Effect": "Allow",
            "Action": "kinesis:PutRecord",
            "Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/Kinesis_Kinesis_Lambda_test"
        }
    ]
}

"Resource" には、コピーしておいた Kinesis Data Streams の ARN を入れてください。
これが、Kinesis Data Streams にデータを Put するための IAM ポリシーです。

・ポリシーのアタッチ

作成した「InLambda_Kinesis_Lambda_test」の [設定タブ] - [アクセス権限] - [実行ロール] より、Lambda に付与された IAM ロールのリンクをクリックしてください。IAM ロールの詳細が別タブで開きます。

IAM ロールの詳細画面で、[許可を追加] - [ポリシーをアタッチ] より、先程作成した Kinesis Data Streams にデータを Put するための IAM ポリシー「Lambda_Kinesis_PutRecordPermission」をアタッチします。

これで Kinesis Data Streams にデータを Put するための権限を Lambda 関数に追加できました。

関数コードの作成

以下のように関数コードを作成します。
「lambda_function.py」の中身を以下のコードに書き換えてください。

lambda_function.py

import json
import boto3
import random
import logging

# CloudWatch Logsにログを出力するための設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    client = boto3.client('kinesis')
    messages = ["オーレオレ", "サンバ", "ソーレソレ", "ポンポコ"]

    for _ in range(15):
        message = random.choice(messages)
        client.put_record(
            StreamName='Kinesis_Kinesis_Lambda_test', 
            Data=json.dumps(message),
            PartitionKey='matsukensamba'  # 同じパーティションキーを使用
        )

        # デコードしたデータをCloudWatch Logsに表示
        logger.info(f"Put data: {message}")

    return {
        'statusCode': 200,
        'body': json.dumps('Successfully sent messages to Kinesis Data Streams!')
    }

この関数は、Lambda から「"オーレオレ"」「"サンバ"」「"ソーレソレ"」「"ポンポコ"」の 4 つの文字列の中からランダムに選択した文字列を Kinesis Data Streams のストリームに送信します。
関数コードの詳細を確認したい方は以下のトグルを展開してください。

Lambda 関数コードの解説(クリックで展開)
  • 必要なモジュールをインポートしています。
    • json:JSON データのエンコードとデコードを行うため
    • boto3:AWS のサービスに対する Python インターフェースを提供するライブラリ
    • random: ランダムな要素の選択や数値の生成を行うため
    • logging:ログ出力を行うため
  • CloudWatch Logs へログ出力の設定をおこなっています。logger.setLevel(logging.INFO) により、INFO レベル以上のログが出力されます。
  • lambda_handler 関数は、Lambda から呼び出されるメインの関数です。この関数は eventcontext の 2 つの引数を受け取ります。
    • event:Lambda 関数をトリガーしたイベントの詳細を含む辞書型データ
    • context:Lambda 関数の実行に関する情報を含むオブジェクト
  • boto3.client('kinesis') で Kinesis クライアントを作成します。
  • messages リストには、Kinesis Data Streams に送信するメッセージが格納されています。
  • for ループを使って、15 回メッセージを Kinesis Data Streams に送信します。
    • random.choice(messages)messages リストからランダムにメッセージを選択します。
    • 選択したメッセージは client.put_record() メソッドで Kinesis Data Streams に送信されます。このメソッドには以下のパラメータが渡されます。
    • StreamName:データを送信するストリームの名前
    • Data:送信するデータ。メッセージを JSON 形式に変換している
    • PartitionKey:データを分割するためのキー。同じキーのデータは同じシャードに配置されます。
  • logger.info(f"Put data: {message}") で、送信したメッセージを CloudWatch Logs に出力します。
  • 最後に HTTP ステータスコード 200 とメッセージを含む辞書型データを JSON 形式の文字列に変換して返します。

===解説ここまで===

このコードでは流し込むすべてのデータに「matsukensamba」という文字列をパーティションキーに設定しています。
パーティションキーは数字である必要はありません。
パーティションキーは UTF-8 エンコーディングで最大 256 バイトの文字列を指定できます。 *1
この関数コードのように「matsukensamba」という文字列をパーティションキーとして使用することも可能です。

今回は少量データの検証のため、一律のパーティションキーを割り振ることでコードを簡素化しています。

Lambda から Kinesis Data Streams にデータを投入してみる

作成した Lambda 関数を実行して、Kinesis Data Streams にデータを投入します。

Lambda 関数の詳細でテストタブを開き、特に設定変更等はせず「テスト」をクリックします。

テストが成功すると、緑のバーで「成功」と表示されます。

Lambda から送信されたデータの確認

Lambda 関数のテスト実行後、CloudWatch Logs より、Lambda 関数の実行ログを検索・確認します。

  • ①CloudWatch コンソールで [ログ] - [ログのインサイト(CloudWatch Logs Insight)] を開きます。
  • ②タイムゾーンが「Local timezone」(日本時間)になっていることを確認します。
  • ③Lambda 関数を実行した時間を指定します。
  • ④ロググループを選択します。今回は Lambda 関数「InLambda_Kinesis_Lambda_test」のロググループである「/aws/lambda/InLambda_Kinesis_Lambda_test」を選択しています。
  • ⑤ログを検索するためのクエリを書きます。今回は 20 行のログを昇順(asc、早い時間→遅い時間)で表示するため、以下のようにクエリを書きます。
fields @timestamp, @message
| sort @timestamp asc
| limit 20
  • ⑥「クエリの実行」をクリックすると、以下のようにログが表示されます。

実際に文字列が流し込まれた箇所だけ拡大します。

Lambda から「"オーレオレ"」「"サンバ"」「"ソーレソレ"」「"ポンポコ"」の 4 つの文字列の中からランダムに選択した文字列が送信されているのが分かります。

Kinesis Data Streams に格納されたデータの確認

では、Kinesis Data Streams に格納されたデータを見てみます。
Kinesis コンソールで対象の Kinesis Data Streams の詳細を開き、データビューワータブをクリックします。
今回はパーティションキーを一律で「matsukensamba」としたので、一つのシャードにすべてのデータが格納されたはずです。

  • シャード:shardId-000000000003
  • 開始位置:タイムスタンプで
  • 開始日:Lambda 関数を実行した日
  • 開始時刻:Lambda 関数を実行した時刻

と設定し、「レコードを取得」をクリックすると、以下のようにデータが格納されていました。

データ部分を拡大します。

"\u30aa\u30fc\u30ec\u30aa\u30ec"
"\u30aa\u30fc\u30ec\u30aa\u30ec"
"\u30dd\u30f3\u30dd\u30b3"
"\u30aa\u30fc\u30ec\u30aa\u30ec"
"\u30b5\u30f3\u30d0"
"\u30b5\u30f3\u30d0"
"\u30dd\u30f3\u30dd\u30b3"
"\u30bd\u30fc\u30ec\u30bd\u30ec"
"\u30dd\u30f3\u30dd\u30b3"
"\u30aa\u30fc\u30ec\u30aa\u30ec"
"\u30b5\u30f3\u30d0"
"\u30bd\u30fc\u30ec\u30bd\u30ec"
"\u30b5\u30f3\u30d0"
"\u30dd\u30f3\u30dd\u30b3"
"\u30b5\u30f3\u30d0"

1 つのデータをクリックすると、以下のように raw データ(生データ)が表示されます。

「JSON」に切り替えると、"サンバ" と表示されます。

"\u30b5\u30f3\u30d0" という文字列は、Unicode エスケープシーケンスを用いて「サンバ」という日本語文字列を表現したものです。
同様に他の文字列も日本語に直すと、以下のようになります。

"オーレオレ"
"オーレオレ"
"ポンポコ"
"オーレオレ"
"サンバ"
"サンバ"
"ポンポコ"
"ソーレソレ"
"ポンポコ"
"オーレオレ"
"サンバ"
"ソーレソレ"
"サンバ"
"ポンポコ"
"サンバ"

Lambda から送信されたデータと Kinesis Data Streams に格納されたデータの比較

Lambda から送信されたデータ(CloudWatch Logs で確認)と、Kinesis Data Streams に格納されたデータを比較してみます。

このように、Lambda から送信された順番のまま Kinesis Data Streams のシャードにデータが格納されていることが分かります。

おわりに

Kinesis Data Streams を理解するために図を描いてみました。 特にデータレコードとシャードの関係、シーケンス番号・パーティションキーの意味などは分かりにくかったので、絵に描いて整理できて少しすっきりしました。
どなたかのお役に立てば幸いです。

参考

脚注

  1. 「UTF-8でエンコード」とは、文字データを UTF-8 という形式でバイトデータに変換することを意味します。このプロセスは「エンコーディング」または「符号化」と呼ばれます。UTF-8 は Unicode Transformation Format 8-bit の略で、全世界の文字を表現するためのエンコーディングの一つです。UTF-8 は可変長エンコーディングであり、各文字を 1 バイトから 4 バイトで表現します。