Amazon TimestreamのデータをLambdaで取得する

Amazon Timestreamに溜まったデータについて、Lambdaで取得してみました。 VPC設定など不要のため、お手軽に取得できました。
2020.10.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Amazon Timestreamに溜まったデータについて、Lambdaで取得してみました。

なお、下記で作成したAmazon Timestreamのデータベースとテーブルを使います。

環境

項目 バージョン
Python 3.7
boto3 1.15.10
botocore 1.18.10

AWS SAMでLambdaを作成する

投稿時点において、Lambda標準のboto3バージョンが低いため(boto3:1.14.48、botocore:1.17.48)、そのままではAmazon Timestreamにアクセスできません。

[ERROR] UnknownServiceError: Unknown service: 'timestream-query'.
Valid service names are: accessanalyzer, acm, ...(略)

そのため、最新のboto3を一緒にLambdaパッケージに抱き込みます。 手元で試すだけならローカルPCで実行してもOKです。

初期化

AWS SAMで初期化します。

sam init \
    --runtime python3.7 \
    --name Amazon-Timestream-Lambda-Sample \
    --app-template hello-world

テンプレートファイル

手動実行させるため、Lambdaのイベントは設定していません。

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Amazon-Timestream-Lambda-Sample

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.7
      Timeout: 10
      Policies:
        - arn:aws:iam::aws:policy/AmazonTimestreamReadOnlyAccess

  HelloWorldFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${HelloWorldFunction}

Pythonコード

下記を参考に実装します。

app.py

import json
import boto3
from botocore.config import Config

config = Config(region_name = 'us-west-2')
config.endpoint_discovery_enabled = True
timestream_query_client = boto3.client('timestream-query', config=config)


def lambda_handler(event, context):
    result = timestream_query_client.query(
        QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" ORDER BY time DESC LIMIT 10'
    )

    print(result['ColumnInfo'])
    print(result['Rows'])

requirements.txt

最新のboto3を使いたいので、忘れず記載します。

boto3

デプロイ

sam build

sam package \
    --output-template-file packaged.yaml \
    --s3-bucket cm-fujii.genki-deploy

sam deploy \
    --template-file packaged.yaml \
    --stack-name Amazon-Timestream-Lambda-Sample-Stack \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset

Lambdaを実行して、Amazon Timestreamからデータを取得する

マネジメントコンソールから直接実行します。

シンプルなデータ取得

まずは、シンプルにデータを取得します。

result = timestream_query_client.query(
    QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" ORDER BY time DESC LIMIT 10'
)

結果は下記です(見やすいように整形しています)。データ自体は取得できましたが、プログラム的に扱いづらいですね……。

[
    {'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'measure_value::bigint', 'Type':{'ScalarType': 'BIGINT'}},
    {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}
]

[
    {
        'Data': [
            {'ScalarValue': 'w7777'},
            {'ScalarValue': '1601529802758'},
            {'ScalarValue': '7'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:23:26.335000000'}
        ]
    },
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529781165'},
            {'ScalarValue': '20'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:23:03.793000000'}
        ]
    },
    ...(略)...,
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529491031'},
            {'ScalarValue': '10'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:18:45.884000000'}
        ]
    }
]

デバイスIDを指定してデータ取得

デバイスIDがw0001のデータを取得します。

result = timestream_query_client.query(
    QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" '
                'WHERE deviceId = \'w0001\' ORDER BY time DESC LIMIT 10'
)

結果は下記です。バッチリ取得できていますね。

[
    {'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'measure_value::bigint', 'Type': {'ScalarType': 'BIGINT'}},
    {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}
]

[
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529781165'},
            {'ScalarValue': '20'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:23:03.793000000'}
        ]
    },
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529730510'},
            {'ScalarValue': '22'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:22:16.162000000'}
        ]
    },
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529720614'},
            {'ScalarValue': '17'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:22:07.331000000'}
        ]
    },
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529708804'},
            {'ScalarValue': '12'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:21:55.518000000'}
        ]
    },
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529491031'},
            {'ScalarValue': '10'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:18:45.884000000'}
        ]
    }
]

デバイスIDと時刻を指定してデータ取得

デバイスIDがw0001のデータのうち、Start/Endの時刻の範囲にあるデータを取得してみました。

result = timestream_query_client.query(
    QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" '
                'WHERE deviceId = \'w0001\' AND '
                'timestamp BETWEEN \'1601529480000\' AND \'1601529720610 \' '
                'ORDER BY time DESC LIMIT 10'
)

結果は下記です。これもOKですね。

[
    {'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'measure_value::bigint', 'Type': {'ScalarType': 'BIGINT'}},
    {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}
]

[
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529708804'},
            {'ScalarValue': '12'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:21:55.518000000'}
        ]
    },
    {
        'Data': [
            {'ScalarValue': 'w0001'},
            {'ScalarValue': '1601529491031'},
            {'ScalarValue': '10'},
            {'ScalarValue': 'waterLevel'},
            {'ScalarValue': '2020-10-01 05:18:45.884000000'}
        ]
    }
]

さいごに

最新のboto3を使う必要はありますが、お手軽にできました。それにしても、VPCは不要なのですね。これは嬉しいです。

参考