Amazon TimestreamのデータをLambdaで取得する
Amazon Timestreamに溜まったデータについて、Lambdaで取得してみました。
なお、下記で作成したAmazon Timestreamのデータベースとテーブルを使います。
環境
項目 | バージョン |
---|---|
Python | 3.7 |
boto3 | 1.15.10 |
botocore | 1.18.10 |
AWS SAMでLambdaを作成する
投稿時点において、Lambda標準のboto3バージョンが低いため(boto3:1.14.48、botocore:1.17.48)、そのままではAmazon Timestreamにアクセスできません。
[ERROR] UnknownServiceError: Unknown service: 'timestream-query'. Valid service names are: accessanalyzer, acm, ...(略)
そのため、最新のboto3を一緒にLambdaパッケージに抱き込みます。 手元で試すだけならローカルPCで実行してもOKです。
初期化
AWS SAMで初期化します。
sam init \ --runtime python3.7 \ --name Amazon-Timestream-Lambda-Sample \ --app-template hello-world
テンプレートファイル
手動実行させるため、Lambdaのイベントは設定していません。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Amazon-Timestream-Lambda-Sample Resources: HelloWorldFunction: Type: AWS::Serverless::Function Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.7 Timeout: 10 Policies: - arn:aws:iam::aws:policy/AmazonTimestreamReadOnlyAccess HelloWorldFunctionLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/lambda/${HelloWorldFunction}
Pythonコード
下記を参考に実装します。
import json import boto3 from botocore.config import Config config = Config(region_name = 'us-west-2') config.endpoint_discovery_enabled = True timestream_query_client = boto3.client('timestream-query', config=config) def lambda_handler(event, context): result = timestream_query_client.query( QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" ORDER BY time DESC LIMIT 10' ) print(result['ColumnInfo']) print(result['Rows'])
requirements.txt
最新のboto3を使いたいので、忘れず記載します。
boto3
デプロイ
sam build sam package \ --output-template-file packaged.yaml \ --s3-bucket cm-fujii.genki-deploy sam deploy \ --template-file packaged.yaml \ --stack-name Amazon-Timestream-Lambda-Sample-Stack \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset
Lambdaを実行して、Amazon Timestreamからデータを取得する
マネジメントコンソールから直接実行します。
シンプルなデータ取得
まずは、シンプルにデータを取得します。
result = timestream_query_client.query( QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" ORDER BY time DESC LIMIT 10' )
結果は下記です(見やすいように整形しています)。データ自体は取得できましたが、プログラム的に扱いづらいですね……。
[ {'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'measure_value::bigint', 'Type':{'ScalarType': 'BIGINT'}}, {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}} ] [ { 'Data': [ {'ScalarValue': 'w7777'}, {'ScalarValue': '1601529802758'}, {'ScalarValue': '7'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:23:26.335000000'} ] }, { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529781165'}, {'ScalarValue': '20'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:23:03.793000000'} ] }, ...(略)..., { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529491031'}, {'ScalarValue': '10'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:18:45.884000000'} ] } ]
デバイスIDを指定してデータ取得
デバイスIDがw0001
のデータを取得します。
result = timestream_query_client.query( QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" ' 'WHERE deviceId = \'w0001\' ORDER BY time DESC LIMIT 10' )
結果は下記です。バッチリ取得できていますね。
[ {'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'measure_value::bigint', 'Type': {'ScalarType': 'BIGINT'}}, {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}} ] [ { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529781165'}, {'ScalarValue': '20'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:23:03.793000000'} ] }, { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529730510'}, {'ScalarValue': '22'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:22:16.162000000'} ] }, { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529720614'}, {'ScalarValue': '17'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:22:07.331000000'} ] }, { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529708804'}, {'ScalarValue': '12'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:21:55.518000000'} ] }, { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529491031'}, {'ScalarValue': '10'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:18:45.884000000'} ] } ]
デバイスIDと時刻を指定してデータ取得
デバイスIDがw0001
のデータのうち、Start/Endの時刻の範囲にあるデータを取得してみました。
result = timestream_query_client.query( QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" ' 'WHERE deviceId = \'w0001\' AND ' 'timestamp BETWEEN \'1601529480000\' AND \'1601529720610 \' ' 'ORDER BY time DESC LIMIT 10' )
結果は下記です。これもOKですね。
[ {'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'measure_value::bigint', 'Type': {'ScalarType': 'BIGINT'}}, {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}} ] [ { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529708804'}, {'ScalarValue': '12'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:21:55.518000000'} ] }, { 'Data': [ {'ScalarValue': 'w0001'}, {'ScalarValue': '1601529491031'}, {'ScalarValue': '10'}, {'ScalarValue': 'waterLevel'}, {'ScalarValue': '2020-10-01 05:18:45.884000000'} ] } ]
さいごに
最新のboto3を使う必要はありますが、お手軽にできました。それにしても、VPCは不要なのですね。これは嬉しいです。