Run Amazon Athena’s queries with AWS Lambda

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

We introduce how to Amazon Athena using AWS Lambda(Python3.6).

Overview

  • Receive key data when an Event published and AWS lambda is executed.
  • Run query at Amazon Athena and get the result from execution.
  • Delete s3 objects created at the time of Athena execution.

s3 data

S3 data sample is as follows:

{"name":"user1","email":"user1@example.com"}
{"name":"user2","email":"user2@example.com"}
{"name":"user3","email":"user3@example.com"}

AWS Lambda

Sample Code

Get name of data when Lambda function gets invoked, run the query on Athena and return the response.

  • Get name data
# get keyword
keyword = event['name']
  • Create query
# create query
query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
  • Run query with Athena
# Execution
response = client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': DATABASE
    },
    ResultConfiguration={
        'OutputLocation': S3_OUTPUT,
    }
)
  • Get execution id
# get query execution id
query_execution_id = response['QueryExecutionId']
  • Check the status of QueryExecution

Assuming that the execution time does not take so much time, we execute using a for loop instead of while loop, with a specified retry count.

# get execution status
for i in range(1, 1 + RETRY_COUNT):

    # get query execution
    query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
    query_execution_status = query_status['QueryExecution']['Status']['State']

    if query_execution_status == 'SUCCEEDED':
        print("STATUS:" + query_execution_status)
        break

    if query_execution_status == 'FAILED':
        raise Exception("STATUS:" + query_execution_status)

    else:
        print("STATUS:" + query_execution_status)
        time.sleep(i)
else:
    client.stop_query_execution(QueryExecutionId=query_execution_id)
    raise Exception('TIME OVER')
  • Get Response
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
  • Example Response
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'name'}, {'VarCharValue': 'email'}]}, {'Data': [{'VarCharValue': 'user1'}, {'VarCharValue': 'user1@example.com'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'name', 'Label': 'name', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}, {'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'email', 'Label': 'email', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}]}}, 'ResponseMetadata': {'RequestId': 'e0c1ec4c-09a1-11e8-97a4-************', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 04 Feb 2018 11:52:30 GMT', 'x-amzn-requestid': 'e0c1ec4c-09a1-11e8-97a4-************', 'content-length': '1026', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}

We get email address data from the response in the following example code:

# get data
if len(result['ResultSet']['Rows']) == 2:

    email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']

    return email

else:
    return None

Execution

  • Execution test event
{
  "name": "user1"
}
  • Execution Result

画像

We successfully obtained the email address.

Created file in the S3 bucket

When Athena is executed, the following files are created in the S3 bucket of the output destination. We recommend that delete it regularly using the S3 Bucket's Lifecycle Policy.

  • file example e0c1ec4c-09a1-11e8-97a4-************.csv e0c1ec4c-09a1-11e8-97a4-************.csv.metadata

OR if you want to delete the files each time, you might delete it in code as follows.

# If you want to delete output file
# s3 client
client = boto3.client('s3')

# created s3 object
s3_objects_key = []
s3_object_key_csv = query_execution_id + '.csv'
s3_objects_key.append({'Key': s3_object_key_csv})
s3_object_key_metadata = query_execution_id + '.csv.metadata'
s3_objects_key.append({'Key': s3_object_key_metadata})

# delete s3 object
for i in range(1, 1 + RETRY_COUNT):

    response = client.delete_objects(
        Bucket=S3_BUCKET,
        Delete={
            'Objects': s3_objects_key
        }
    )

    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        print("delete %s complete" % s3_objects_key)
        break

    else:
        print(response['ResponseMetadata']['HTTPStatusCode'])
        time.sleep(i)

else:
    raise Exception('object %s delete failed' % s3_objects_key)

IAM Role

We created IAM Role as follows that base on AmazonAthenaFullAccess.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "athena:*"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:CreateDatabase",
                "glue:DeleteDatabase",
                "glue:GetDatabase",
                "glue:GetDatabases",
                "glue:UpdateDatabase",
                "glue:CreateTable",
                "glue:DeleteTable",
                "glue:BatchDeleteTable",
                "glue:UpdateTable",
                "glue:GetTable",
                "glue:GetTables",
                "glue:BatchCreatePartition",
                "glue:CreatePartition",
                "glue:DeletePartition",
                "glue:BatchDeletePartition",
                "glue:UpdatePartition",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:BatchGetPartition"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::your_athena_query_results_bucket",
                "arn:aws:s3:::your_athena_query_results_bucket/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::your_athena_bucket",
                "arn:aws:s3:::your_athena_bucket/*"
            ]
        }
    ]
}

Conclusion

We explained how to run Amazon Athena using AWS Lambda (Python) based on sample code.

References