この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Lambda(Python3.6)からAthenaを実行する機会がありましたのでサンプルコードをご紹介します。
Overview
- Event発生時にキーとなる情報を受け取り AWS Lambda が実行される
- Amazon Athenaでクエリを実行し実行結果を取得する
- Athena実行時に作成されたS3オブジェクトは削除する
s3 data
s3 にあるデータは以下とします。
{"name":"user1","email":"user1@example.com"}
{"name":"user2","email":"user2@example.com"}
{"name":"user3","email":"user3@example.com"}
AWS Lambda
Sample Code
呼び出されるときにname
の情報を取得しAthenaでクエリを実行してレスポンス情報を返します。
import time
import boto3
# athena constant
DATABASE = 'your_athena_database_name'
TABLE = 'your_athena_table_name'
# S3 constant
S3_OUTPUT = 's3://your_athena_query_output_backet_name'
S3_BUCKET = 'your_athena_query_output_backet_name'
# number of retries
RETRY_COUNT = 10
# query constant
COLUMN = 'your_column_name'
def lambda_handler(event, context):
# get keyword
keyword = event['name']
# created query
query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
# athena client
client = boto3.client('athena')
# Execution
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
# get query execution id
query_execution_id = response['QueryExecutionId']
print(query_execution_id)
# get execution status
for i in range(1, 1 + RETRY_COUNT):
# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
print(result)
# get data
if len(result['ResultSet']['Rows']) == 2:
email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']
return email
else:
return None
name
の情報を取得
# get keyword
keyword = event['name']
- クエリを作成
# created query
query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
- Athena実行
# Execution
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
- execution idを取得
# get query execution id
query_execution_id = response['QueryExecutionId']
- QueryExecutionのステータスを確認
今回は実行時間に時間がかからないことを前提にwhile
でなくfor
でリトライ回数を指定して実行しました。
# get execution status
for i in range(1, 1 + RETRY_COUNT):
# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
- Responseを取得
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
- Response例
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'name'}, {'VarCharValue': 'email'}]}, {'Data': [{'VarCharValue': 'user1'}, {'VarCharValue': 'user1@example.com'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'name', 'Label': 'name', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}, {'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'email', 'Label': 'email', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}]}}, 'ResponseMetadata': {'RequestId': 'e0c1ec4c-09a1-11e8-97a4-************', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 04 Feb 2018 11:52:30 GMT', 'x-amzn-requestid': 'e0c1ec4c-09a1-11e8-97a4-************', 'content-length': '1026', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
レスポンスからemailアドレスの情報を取得したいので以下のように取得します。
# get data
if len(result['ResultSet']['Rows']) == 2:
email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']
return email
else:
return None
実行
- テストイベントを実行
{
"name": "user1"
}
- 実行結果
emailアドレスを取得することができました。
S3 バケットに作成されるファイル
Athenaを実行すると以下ファイルがアウトプット先のS3 バケットに作成されファイルが溜まっていくので S3 バケットのライフサイクルポリシーで定期的に削除するようにしておきましょう。
作成されるファイル例
- e0c1ec4c-09a1-11e8-97a4-************.csv
- e0c1ec4c-09a1-11e8-97a4-************.csv.metadata
また、その都度削除していい場合はcodeの中で以下のようにして削除してもいいかもしれません。
# If you want to delete output file
# s3 client
client = boto3.client('s3')
# created s3 object
s3_objects_key = []
s3_object_key_csv = query_execution_id + '.csv'
s3_objects_key.append({'Key': s3_object_key_csv})
s3_object_key_metadata = query_execution_id + '.csv.metadata'
s3_objects_key.append({'Key': s3_object_key_metadata})
# delete s3 object
for i in range(1, 1 + RETRY_COUNT):
response = client.delete_objects(
Bucket=S3_BUCKET,
Delete={
'Objects': s3_objects_key
}
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print("delete %s complete" % s3_objects_key)
break
else:
print(response['ResponseMetadata']['HTTPStatusCode'])
time.sleep(i)
else:
raise Exception('object %s delete failed' % s3_objects_key)
IAM Role
IAM RoleはAmazonAthenaFullAccess
を参考に以下のようにしました。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:*"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:DeleteDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:DeleteTable",
"glue:BatchDeleteTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:BatchCreatePartition",
"glue:CreatePartition",
"glue:DeletePartition",
"glue:BatchDeletePartition",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::your_athena_query_results_bucket",
"arn:aws:s3:::your_athena_query_results_bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::your_athena_bucket",
"arn:aws:s3:::your_athena_bucket/*"
]
}
]
}
まとめ
AWS LambdaでAmazon Athnaを動かしてみました。 ではまた。