AWS Lambda で Amazon Athena のクエリを実行してS3のデータを取得する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Lambda(Python3.6)からAthenaを実行する機会がありましたのでサンプルコードをご紹介します。

Overview

  • Event発生時にキーとなる情報を受け取り AWS Lambda が実行される
  • Amazon Athenaでクエリを実行し実行結果を取得する
  • Athena実行時に作成されたS3オブジェクトは削除する

s3 data

s3 にあるデータは以下とします。

{"name":"user1","email":"user1@example.com"}
{"name":"user2","email":"user2@example.com"}
{"name":"user3","email":"user3@example.com"}

AWS Lambda

Sample Code

呼び出されるときにnameの情報を取得しAthenaでクエリを実行してレスポンス情報を返します。

  • nameの情報を取得
# get keyword
keyword = event['name']
  • クエリを作成
# created query
query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
  • Athena実行
# Execution
response = client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': DATABASE
    },
    ResultConfiguration={
        'OutputLocation': S3_OUTPUT,
    }
)
  • execution idを取得
# get query execution id
query_execution_id = response['QueryExecutionId']
  • QueryExecutionのステータスを確認

今回は実行時間に時間がかからないことを前提にwhileでなくforでリトライ回数を指定して実行しました。

# get execution status
for i in range(1, 1 + RETRY_COUNT):

    # get query execution
    query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
    query_execution_status = query_status['QueryExecution']['Status']['State']

    if query_execution_status == 'SUCCEEDED':
        print("STATUS:" + query_execution_status)
        break

    if query_execution_status == 'FAILED':
        raise Exception("STATUS:" + query_execution_status)

    else:
        print("STATUS:" + query_execution_status)
        time.sleep(i)
else:
    client.stop_query_execution(QueryExecutionId=query_execution_id)
    raise Exception('TIME OVER')
  • Responseを取得
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
  • Response例
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'name'}, {'VarCharValue': 'email'}]}, {'Data': [{'VarCharValue': 'user1'}, {'VarCharValue': 'user1@example.com'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'name', 'Label': 'name', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}, {'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'email', 'Label': 'email', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}]}}, 'ResponseMetadata': {'RequestId': 'e0c1ec4c-09a1-11e8-97a4-************', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 04 Feb 2018 11:52:30 GMT', 'x-amzn-requestid': 'e0c1ec4c-09a1-11e8-97a4-************', 'content-length': '1026', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}

レスポンスからemailアドレスの情報を取得したいので以下のように取得します。

# get data
if len(result['ResultSet']['Rows']) == 2:

    email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']

    return email

else:
    return None

実行

  • テストイベントを実行
{
  "name": "user1"
}
  • 実行結果

画像

emailアドレスを取得することができました。

S3 バケットに作成されるファイル

Athenaを実行すると以下ファイルがアウトプット先のS3 バケットに作成されファイルが溜まっていくので S3 バケットのライフサイクルポリシーで定期的に削除するようにしておきましょう。

作成されるファイル例

  • e0c1ec4c-09a1-11e8-97a4-************.csv
  • e0c1ec4c-09a1-11e8-97a4-************.csv.metadata

また、その都度削除していい場合はcodeの中で以下のようにして削除してもいいかもしれません。

# If you want to delete output file
# s3 client
client = boto3.client('s3')

# created s3 object
s3_objects_key = []
s3_object_key_csv = query_execution_id + '.csv'
s3_objects_key.append({'Key': s3_object_key_csv})
s3_object_key_metadata = query_execution_id + '.csv.metadata'
s3_objects_key.append({'Key': s3_object_key_metadata})

# delete s3 object
for i in range(1, 1 + RETRY_COUNT):

    response = client.delete_objects(
        Bucket=S3_BUCKET,
        Delete={
            'Objects': s3_objects_key
        }
    )

    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        print("delete %s complete" % s3_objects_key)
        break

    else:
        print(response['ResponseMetadata']['HTTPStatusCode'])
        time.sleep(i)

else:
    raise Exception('object %s delete failed' % s3_objects_key)

IAM Role

IAM RoleはAmazonAthenaFullAccessを参考に以下のようにしました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "athena:*"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:CreateDatabase",
                "glue:DeleteDatabase",
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:UpdateDatabase",
                "glue:CreateTable",
                "glue:DeleteTable",
                "glue:BatchDeleteTable",
                "glue:UpdateTable",
                "glue:GetTable",
                "glue:GetTables",
                "glue:BatchCreatePartition",
                "glue:CreatePartition",
                "glue:DeletePartition",
                "glue:BatchDeletePartition",
                "glue:UpdatePartition",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:BatchGetPartition"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::your_athena_query_results_bucket",
                "arn:aws:s3:::your_athena_query_results_bucket/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::your_athena_bucket",
                "arn:aws:s3:::your_athena_bucket/*"
            ]
        }
    ]
}

まとめ

AWS LambdaでAmazon Athnaを動かしてみました。 ではまた。

参考URL