AWS Lambda で Amazon Athena のクエリを実行してS3のデータを取得する
Lambda(Python3.6)からAthenaを実行する機会がありましたのでサンプルコードをご紹介します。
Overview
- Event発生時にキーとなる情報を受け取り AWS Lambda が実行される
- Amazon Athenaでクエリを実行し実行結果を取得する
- Athena実行時に作成されたS3オブジェクトは削除する
s3 data
s3 にあるデータは以下とします。
{"name":"user1","email":"user1@example.com"} {"name":"user2","email":"user2@example.com"} {"name":"user3","email":"user3@example.com"}
AWS Lambda
Sample Code
呼び出されるときにname
の情報を取得しAthenaでクエリを実行してレスポンス情報を返します。
import time import boto3 # athena constant DATABASE = 'your_athena_database_name' TABLE = 'your_athena_table_name' # S3 constant S3_OUTPUT = 's3://your_athena_query_output_backet_name' S3_BUCKET = 'your_athena_query_output_backet_name' # number of retries RETRY_COUNT = 10 # query constant COLUMN = 'your_column_name' def lambda_handler(event, context): # get keyword keyword = event['name'] # created query query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword) # athena client client = boto3.client('athena') # Execution response = client.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': DATABASE }, ResultConfiguration={ 'OutputLocation': S3_OUTPUT, } ) # get query execution id query_execution_id = response['QueryExecutionId'] print(query_execution_id) # get execution status for i in range(1, 1 + RETRY_COUNT): # get query execution query_status = client.get_query_execution(QueryExecutionId=query_execution_id) query_execution_status = query_status['QueryExecution']['Status']['State'] if query_execution_status == 'SUCCEEDED': print("STATUS:" + query_execution_status) break if query_execution_status == 'FAILED': raise Exception("STATUS:" + query_execution_status) else: print("STATUS:" + query_execution_status) time.sleep(i) else: client.stop_query_execution(QueryExecutionId=query_execution_id) raise Exception('TIME OVER') # get query results result = client.get_query_results(QueryExecutionId=query_execution_id) print(result) # get data if len(result['ResultSet']['Rows']) == 2: email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue'] return email else: return None
name
の情報を取得
# get keyword keyword = event['name']
- クエリを作成
# created query query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
- Athena実行
# Execution response = client.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': DATABASE }, ResultConfiguration={ 'OutputLocation': S3_OUTPUT, } )
- execution idを取得
# get query execution id query_execution_id = response['QueryExecutionId']
- QueryExecutionのステータスを確認
今回は実行時間に時間がかからないことを前提にwhile
でなくfor
でリトライ回数を指定して実行しました。
# get execution status for i in range(1, 1 + RETRY_COUNT): # get query execution query_status = client.get_query_execution(QueryExecutionId=query_execution_id) query_execution_status = query_status['QueryExecution']['Status']['State'] if query_execution_status == 'SUCCEEDED': print("STATUS:" + query_execution_status) break if query_execution_status == 'FAILED': raise Exception("STATUS:" + query_execution_status) else: print("STATUS:" + query_execution_status) time.sleep(i) else: client.stop_query_execution(QueryExecutionId=query_execution_id) raise Exception('TIME OVER')
- Responseを取得
# get query results result = client.get_query_results(QueryExecutionId=query_execution_id)
- Response例
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'name'}, {'VarCharValue': 'email'}]}, {'Data': [{'VarCharValue': 'user1'}, {'VarCharValue': 'user1@example.com'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'name', 'Label': 'name', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}, {'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'email', 'Label': 'email', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}]}}, 'ResponseMetadata': {'RequestId': 'e0c1ec4c-09a1-11e8-97a4-************', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 04 Feb 2018 11:52:30 GMT', 'x-amzn-requestid': 'e0c1ec4c-09a1-11e8-97a4-************', 'content-length': '1026', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
レスポンスからemailアドレスの情報を取得したいので以下のように取得します。
# get data if len(result['ResultSet']['Rows']) == 2: email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue'] return email else: return None
実行
- テストイベントを実行
{ "name": "user1" }
- 実行結果
emailアドレスを取得することができました。
S3 バケットに作成されるファイル
Athenaを実行すると以下ファイルがアウトプット先のS3 バケットに作成されファイルが溜まっていくので S3 バケットのライフサイクルポリシーで定期的に削除するようにしておきましょう。
作成されるファイル例
- e0c1ec4c-09a1-11e8-97a4-************.csv
- e0c1ec4c-09a1-11e8-97a4-************.csv.metadata
また、その都度削除していい場合はcodeの中で以下のようにして削除してもいいかもしれません。
# If you want to delete output file # s3 client client = boto3.client('s3') # created s3 object s3_objects_key = [] s3_object_key_csv = query_execution_id + '.csv' s3_objects_key.append({'Key': s3_object_key_csv}) s3_object_key_metadata = query_execution_id + '.csv.metadata' s3_objects_key.append({'Key': s3_object_key_metadata}) # delete s3 object for i in range(1, 1 + RETRY_COUNT): response = client.delete_objects( Bucket=S3_BUCKET, Delete={ 'Objects': s3_objects_key } ) if response['ResponseMetadata']['HTTPStatusCode'] == 200: print("delete %s complete" % s3_objects_key) break else: print(response['ResponseMetadata']['HTTPStatusCode']) time.sleep(i) else: raise Exception('object %s delete failed' % s3_objects_key)
IAM Role
IAM RoleはAmazonAthenaFullAccess
を参考に以下のようにしました。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:*" ], "Resource": [ "*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:DeleteDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:CreateTable", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:DeletePartition", "glue:BatchDeletePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition" ], "Resource": [ "*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::your_athena_query_results_bucket", "arn:aws:s3:::your_athena_query_results_bucket/*" ] }, { "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetObject" ], "Resource": [ "arn:aws:s3:::your_athena_bucket", "arn:aws:s3:::your_athena_bucket/*" ] } ] }
まとめ
AWS LambdaでAmazon Athnaを動かしてみました。 ではまた。