この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Amazon Timestreamに溜まったデータについて、Lambdaで取得してみました。
なお、下記で作成したAmazon Timestreamのデータベースとテーブルを使います。
環境
項目 | バージョン |
---|---|
Python | 3.7 |
boto3 | 1.15.10 |
botocore | 1.18.10 |
AWS SAMでLambdaを作成する
投稿時点において、Lambda標準のboto3バージョンが低いため(boto3:1.14.48、botocore:1.17.48)、そのままではAmazon Timestreamにアクセスできません。
[ERROR] UnknownServiceError: Unknown service: 'timestream-query'.
Valid service names are: accessanalyzer, acm, ...(略)
そのため、最新のboto3を一緒にLambdaパッケージに抱き込みます。 手元で試すだけならローカルPCで実行してもOKです。
初期化
AWS SAMで初期化します。
sam init \
--runtime python3.7 \
--name Amazon-Timestream-Lambda-Sample \
--app-template hello-world
テンプレートファイル
手動実行させるため、Lambdaのイベントは設定していません。
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Amazon-Timestream-Lambda-Sample
Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: hello_world/
Handler: app.lambda_handler
Runtime: python3.7
Timeout: 10
Policies:
- arn:aws:iam::aws:policy/AmazonTimestreamReadOnlyAccess
HelloWorldFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/lambda/${HelloWorldFunction}
Pythonコード
下記を参考に実装します。
app.py
import json
import boto3
from botocore.config import Config
config = Config(region_name = 'us-west-2')
config.endpoint_discovery_enabled = True
timestream_query_client = boto3.client('timestream-query', config=config)
def lambda_handler(event, context):
result = timestream_query_client.query(
QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" ORDER BY time DESC LIMIT 10'
)
print(result['ColumnInfo'])
print(result['Rows'])
requirements.txt
最新のboto3を使いたいので、忘れず記載します。
boto3
デプロイ
sam build
sam package \
--output-template-file packaged.yaml \
--s3-bucket cm-fujii.genki-deploy
sam deploy \
--template-file packaged.yaml \
--stack-name Amazon-Timestream-Lambda-Sample-Stack \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset
Lambdaを実行して、Amazon Timestreamからデータを取得する
マネジメントコンソールから直接実行します。
シンプルなデータ取得
まずは、シンプルにデータを取得します。
result = timestream_query_client.query(
QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" ORDER BY time DESC LIMIT 10'
)
結果は下記です(見やすいように整形しています)。データ自体は取得できましたが、プログラム的に扱いづらいですね……。
[
{'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'measure_value::bigint', 'Type':{'ScalarType': 'BIGINT'}},
{'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}
]
[
{
'Data': [
{'ScalarValue': 'w7777'},
{'ScalarValue': '1601529802758'},
{'ScalarValue': '7'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:23:26.335000000'}
]
},
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529781165'},
{'ScalarValue': '20'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:23:03.793000000'}
]
},
...(略)...,
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529491031'},
{'ScalarValue': '10'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:18:45.884000000'}
]
}
]
デバイスIDを指定してデータ取得
デバイスIDがw0001
のデータを取得します。
result = timestream_query_client.query(
QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" '
'WHERE deviceId = \'w0001\' ORDER BY time DESC LIMIT 10'
)
結果は下記です。バッチリ取得できていますね。
[
{'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'measure_value::bigint', 'Type': {'ScalarType': 'BIGINT'}},
{'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}
]
[
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529781165'},
{'ScalarValue': '20'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:23:03.793000000'}
]
},
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529730510'},
{'ScalarValue': '22'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:22:16.162000000'}
]
},
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529720614'},
{'ScalarValue': '17'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:22:07.331000000'}
]
},
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529708804'},
{'ScalarValue': '12'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:21:55.518000000'}
]
},
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529491031'},
{'ScalarValue': '10'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:18:45.884000000'}
]
}
]
デバイスIDと時刻を指定してデータ取得
デバイスIDがw0001
のデータのうち、Start/Endの時刻の範囲にあるデータを取得してみました。
result = timestream_query_client.query(
QueryString='SELECT * FROM "IoT-Sample-Database"."water-level-table" '
'WHERE deviceId = \'w0001\' AND '
'timestamp BETWEEN \'1601529480000\' AND \'1601529720610 \' '
'ORDER BY time DESC LIMIT 10'
)
結果は下記です。これもOKですね。
[
{'Name': 'deviceId', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'timestamp', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'measure_value::bigint', 'Type': {'ScalarType': 'BIGINT'}},
{'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}},
{'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}
]
[
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529708804'},
{'ScalarValue': '12'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:21:55.518000000'}
]
},
{
'Data': [
{'ScalarValue': 'w0001'},
{'ScalarValue': '1601529491031'},
{'ScalarValue': '10'},
{'ScalarValue': 'waterLevel'},
{'ScalarValue': '2020-10-01 05:18:45.884000000'}
]
}
]
さいごに
最新のboto3を使う必要はありますが、お手軽にできました。それにしても、VPCは不要なのですね。これは嬉しいです。