AWS Lambda で Amazon Athena のクエリを実行してS3のデータを取得する

2018.02.21

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Lambda(Python3.6)からAthenaを実行する機会がありましたのでサンプルコードをご紹介します。

Overview

  • Event発生時にキーとなる情報を受け取り AWS Lambda が実行される
  • Amazon Athenaでクエリを実行し実行結果を取得する
  • Athena実行時に作成されたS3オブジェクトは削除する

s3 data

s3 にあるデータは以下とします。

{"name":"user1","email":"user1@example.com"}
{"name":"user2","email":"user2@example.com"}
{"name":"user3","email":"user3@example.com"}

AWS Lambda

Sample Code

呼び出されるときにnameの情報を取得しAthenaでクエリを実行してレスポンス情報を返します。

import time
import boto3

# athena constant
DATABASE = 'your_athena_database_name'
TABLE = 'your_athena_table_name'

# S3 constant
S3_OUTPUT = 's3://your_athena_query_output_backet_name'
S3_BUCKET = 'your_athena_query_output_backet_name'

# number of retries
RETRY_COUNT = 10

# query constant
COLUMN = 'your_column_name'


def lambda_handler(event, context):

    # get keyword
    keyword = event['name']

    # created query
    query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)

    # athena client
    client = boto3.client('athena')

    # Execution
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': S3_OUTPUT,
        }
    )

    # get query execution id
    query_execution_id = response['QueryExecutionId']
    print(query_execution_id)

    # get execution status
    for i in range(1, 1 + RETRY_COUNT):

        # get query execution
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status['QueryExecution']['Status']['State']

        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break

        if query_execution_status == 'FAILED':
            raise Exception("STATUS:" + query_execution_status)

        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')

    # get query results
    result = client.get_query_results(QueryExecutionId=query_execution_id)
    print(result)

    # get data
    if len(result['ResultSet']['Rows']) == 2:

        email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']

        return email

    else:
        return None
  • nameの情報を取得
# get keyword
keyword = event['name']
  • クエリを作成
# created query
query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
  • Athena実行
# Execution
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
  • execution idを取得
# get query execution id
query_execution_id = response['QueryExecutionId']
  • QueryExecutionのステータスを確認

今回は実行時間に時間がかからないことを前提にwhileでなくforでリトライ回数を指定して実行しました。

# get execution status
for i in range(1, 1 + RETRY_COUNT):

# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']

if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break

if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)

else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
  • Responseを取得
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
  • Response例
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'name'}, {'VarCharValue': 'email'}]}, {'Data': [{'VarCharValue': 'user1'}, {'VarCharValue': 'user1@example.com'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'name', 'Label': 'name', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}, {'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'email', 'Label': 'email', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}]}}, 'ResponseMetadata': {'RequestId': 'e0c1ec4c-09a1-11e8-97a4-************', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 04 Feb 2018 11:52:30 GMT', 'x-amzn-requestid': 'e0c1ec4c-09a1-11e8-97a4-************', 'content-length': '1026', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}

レスポンスからemailアドレスの情報を取得したいので以下のように取得します。

# get data
if len(result['ResultSet']['Rows']) == 2:

email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']

return email

else:
return None

実行

  • テストイベントを実行
{
"name": "user1"
}
  • 実行結果

画像

emailアドレスを取得することができました。

S3 バケットに作成されるファイル

Athenaを実行すると以下ファイルがアウトプット先のS3 バケットに作成されファイルが溜まっていくので S3 バケットのライフサイクルポリシーで定期的に削除するようにしておきましょう。

作成されるファイル例

  • e0c1ec4c-09a1-11e8-97a4-************.csv
  • e0c1ec4c-09a1-11e8-97a4-************.csv.metadata

また、その都度削除していい場合はcodeの中で以下のようにして削除してもいいかもしれません。

# If you want to delete output file
# s3 client
client = boto3.client('s3')

# created s3 object
s3_objects_key = []
s3_object_key_csv = query_execution_id + '.csv'
s3_objects_key.append({'Key': s3_object_key_csv})
s3_object_key_metadata = query_execution_id + '.csv.metadata'
s3_objects_key.append({'Key': s3_object_key_metadata})

# delete s3 object
for i in range(1, 1 + RETRY_COUNT):

response = client.delete_objects(
Bucket=S3_BUCKET,
Delete={
'Objects': s3_objects_key
}
)

if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print("delete %s complete" % s3_objects_key)
break

else:
print(response['ResponseMetadata']['HTTPStatusCode'])
time.sleep(i)

else:
raise Exception('object %s delete failed' % s3_objects_key)

IAM Role

IAM RoleはAmazonAthenaFullAccessを参考に以下のようにしました。

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:*"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:DeleteDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:DeleteTable",
"glue:BatchDeleteTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:BatchCreatePartition",
"glue:CreatePartition",
"glue:DeletePartition",
"glue:BatchDeletePartition",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::your_athena_query_results_bucket",
"arn:aws:s3:::your_athena_query_results_bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::your_athena_bucket",
"arn:aws:s3:::your_athena_bucket/*"
]
}
]
}

まとめ

AWS LambdaでAmazon Athnaを動かしてみました。 ではまた。

参考URL