Greengrassコネクタを使って、Kinesis Data FirehoseからS3に保存してみた

はじめに

CX事業本部の佐藤です。 Greengrassではコネクタという機能があり、コネクタを使うことで、GreengrassデバイスからAWSサービスや他のクラウドサービスとの連携が容易になります。今回は、Kinesis Firehoseコネクタを使用してGreengrassデバイスからKinesis Firehoseへデータを送信し、S3に保存してみたいと思います。

Kinesis Data Firehoseの作成

Kinesis Data Firehoseの配信ストリームを作成します

  1. マネージメントコンソールからKinesisを選択します
  2. Kinesisのダッシュボードから配信ストリームの作成を選択します
  3. Delivery stream nameに適当な名前を入れます
  4. SourceDirect PUT or other sourcesを選択します
  5. Nextをクリックします
  6. Process Recordsの設定になりますが、今回はレコードの変換などは行わないため、そのままNextをクリックします
  7. DestinationAmazon S3を選択します
  8. S3 bucketにKinesis Firehoseから送信する対象のS3バケット名を選択します
  9. Nextをクリックします
  10. IAM roleCreate new or chooseをクリックし、別タブに遷移した後、許可をクリックします
  11. 設定内容を確認し、Nextをクリックします
  12. 配信ストリームが作られ、StatusActiveになればOKです

Greengrassグループの設定

次に、Greengrassグループを設定します。Greengrassグループの作成とデバイスへの証明書の設定などは割愛します。グループには適切なIAMロールを設定しておいてください。

Greengrass Lambdaの設定

  1. 対象のGreengrassグループを選択します
  2. Lambdaを選択し、Lambdaの追加を選択します
  3. 新しいLambdaの作成をクリックし、Lambdaの新規作成の画面に遷移します
  4. 関数名に適当な名前を入れて、ランタイムにPython3.7を選択し、関数の作成をクリックします
  5. Lambdaのソースコードに以下の内容を貼り付け、保存をクリックします。今回は、単純にKinesis FirehoseコネクタにMessage from Firehose Connector Testという文字列をPublishしているだけです。
    import greengrasssdk
    import time
    import json
    
    iot_client = greengrasssdk.client('iot-data')
    send_topic = 'kinesisfirehose/message'
    
    def create_request_with_all_fields():
       return  {
           "request": {
               "data": "Message from Firehose Connector Test"
           },
           "id" : 'req_123'
       }
    
    def publish_basic_message():
       messageToPublish = create_request_with_all_fields()
       print(messageToPublish)
       iot_client.publish(topic=send_topic,
           payload=json.dumps(messageToPublish))
    
    def lambda_handler(event, context):
       publish_basic_message()
       return
    
  6. GreengrassにLambdaをデプロイするためには、エイリアスの発行を行わなければならないため、エイリアスの作成を行います。アクションをクリックし新しいバージョンを発行をクリックし、発行をクリックします。その後、エイリアスの作成をクリックし、名前に適当な名前を入れて、バージョンに先ほど発行したバージョン番号を選択します。

  7. Greengrassグループの画面に戻り、既存のLambdaの使用を選択します
  8. 先ほど作ったLambdaを選択し、次へをクリックします
  9. 先ほど発行したエイリアスを選択し、完了をクリックします
  10. GreengrassグループにLambdaが作成されます

Kinesis Firehoseコネクタの作成

次にKinesis Firehoseコネクタを作成します。

  1. 対象のGreengrassグループを選択し、コネクタを選択します
  2. コネクタの追加をクリックします
  3. コネクタの一覧から、Kinesis Firehoseを選択します
  4. Default delivery stream ARNに先ほど作成したKinesis Data FirehoseのArnを設定します
  5. 他の項目については、表示されているデフォルトの設定を入力します
  6. 追加をクリックします

これでコネクタの追加ができました。

サブスクリプションの作成

次にサブスクリプションの設定をします。今回は、testというトピックがパブリッシュされたらGreengrass Lambdaが起動して、Greengrassデバイスからkinesisfirehose/messageトピックに再度パブリッシュします。Kinesis Firehoseコネクタはkinesisfirehose/messageトピックにパブリッシュされたペイロードを受け取り、バッファしたのち、S3に保存するという流れです。

以下のような、サブスクリプションを作成します

ソース ターゲット トピック
Kinesis Firehose IoT Cloud kinesisfirehose/message/status
Greengrass Lambda Kinesis Firehose kinesisfirehose/message
IoT Cloud Greengrass Lambda test
  1. 対象のGreengrassグループを選択し、サブスクリプション選択します
  2. サブスクリプションの追加をクリックします
  3. ソースの選択ターゲットの選択に対象のリソースを選択し、次へをクリックします
  4. トピックフィルターに対応するトピックを入力し、次へをクリックします
  5. 確認し、完了をクリックします
  6. 以下のように設定されていることを確認します

デプロイ

必要な設定が終わったので、デバイスにデプロイします。

  1. 対象のGreengrassグループを選択します
  2. アクションを選択し、デプロイをクリックします
  3. 左上のステータスが正常に完了しましたになれば、デプロイOKです。もし失敗した場合は、証明書の設定などに不備がある可能性があります。

動作確認

デバイスにデプロイできたので、実際に動作確認してみます。

  1. マネージメントコンソールからIoT Coreを選択します
  2. メニューからテストを選択します
  3. トピックへサブスクライブするを選択し、kinesisfirehose/message/statusを入力し、トピックへのサブスクライブをクリックします
  4. トピックへの発行を選択し、testを入力しトピックに発行をクリックします
  5. ちょっとして、サブスクライブしているトピックに以下のようなペイロードが流れてきたら成功です
    {
       "response": [
           {
               "firehose_record_id": "hogehoge",
               "id": "req_123",
               "status": "success"
           }
       ]
    }
    
  6. Kinesis Data Firehoseのの設定で5分間バッファする設定になっているため、5分後に対象のS3バケットを確認し、オブジェクトが作成されていれば成功です

まとめ

Greengrassのコネクタを使ってみました。他にもRasberry PIのGPIOピンを制御するコネクタなど、面白そうなものがあるので、またブログにしてみたいと思います。