Run Amazon Athena’s queries with AWS Lambda

2018.04.02

We introduce how to Amazon Athena using AWS Lambda(Python3.6).

Overview

  • Receive key data when an Event published and AWS lambda is executed.
  • Run query at Amazon Athena and get the result from execution.
  • Delete s3 objects created at the time of Athena execution.

s3 data

S3 data sample is as follows:

{"name":"user1","email":"user1@example.com"}
{"name":"user2","email":"user2@example.com"}
{"name":"user3","email":"user3@example.com"}

AWS Lambda

Sample Code

import time
import boto3

# athena constant
DATABASE = 'your_athena_database_name'
TABLE = 'your_athena_table_name'

# S3 constant
S3_OUTPUT = 's3://your_athena_query_output_backet_name'
S3_BUCKET = 'your_athena_query_output_backet_name'

# number of retries
RETRY_COUNT = 10

# query constant
COLUMN = 'your_column_name'


def lambda_handler(event, context):

    # get keyword
    keyword = event['name']

    # created query
    query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)

    # athena client
    client = boto3.client('athena')

    # Execution
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': S3_OUTPUT,
        }
    )

    # get query execution id
    query_execution_id = response['QueryExecutionId']
    print(query_execution_id)

    # get execution status
    for i in range(1, 1 + RETRY_COUNT):

        # get query execution
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status['QueryExecution']['Status']['State']

        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break

        if query_execution_status == 'FAILED':
            raise Exception("STATUS:" + query_execution_status)

        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')

    # get query results
    result = client.get_query_results(QueryExecutionId=query_execution_id)
    print(result)

    # get data
    if len(result['ResultSet']['Rows']) == 2:

        email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']

        return email

    else:
        return None

Get name of data when Lambda function gets invoked, run the query on Athena and return the response.

  • Get name data
# get keyword
keyword = event['name']
  • Create query
# create query
query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
  • Run query with Athena
# Execution
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
  • Get execution id
# get query execution id
query_execution_id = response['QueryExecutionId']
  • Check the status of QueryExecution

Assuming that the execution time does not take so much time, we execute using a for loop instead of while loop, with a specified retry count.

# get execution status
for i in range(1, 1 + RETRY_COUNT):

# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']

if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break

if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)

else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
  • Get Response
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
  • Example Response
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'name'}, {'VarCharValue': 'email'}]}, {'Data': [{'VarCharValue': 'user1'}, {'VarCharValue': 'user1@example.com'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'name', 'Label': 'name', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}, {'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'email', 'Label': 'email', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}]}}, 'ResponseMetadata': {'RequestId': 'e0c1ec4c-09a1-11e8-97a4-************', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 04 Feb 2018 11:52:30 GMT', 'x-amzn-requestid': 'e0c1ec4c-09a1-11e8-97a4-************', 'content-length': '1026', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}

We get email address data from the response in the following example code:

# get data
if len(result['ResultSet']['Rows']) == 2:

email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']

return email

else:
return None

Execution

  • Execution test event
{
"name": "user1"
}
  • Execution Result

画像

We successfully obtained the email address.

Created file in the S3 bucket

When Athena is executed, the following files are created in the S3 bucket of the output destination. We recommend that delete it regularly using the S3 Bucket's Lifecycle Policy.

  • file example e0c1ec4c-09a1-11e8-97a4-************.csv e0c1ec4c-09a1-11e8-97a4-************.csv.metadata

OR if you want to delete the files each time, you might delete it in code as follows.

# If you want to delete output file
# s3 client
client = boto3.client('s3')

# created s3 object
s3_objects_key = []
s3_object_key_csv = query_execution_id + '.csv'
s3_objects_key.append({'Key': s3_object_key_csv})
s3_object_key_metadata = query_execution_id + '.csv.metadata'
s3_objects_key.append({'Key': s3_object_key_metadata})

# delete s3 object
for i in range(1, 1 + RETRY_COUNT):

response = client.delete_objects(
Bucket=S3_BUCKET,
Delete={
'Objects': s3_objects_key
}
)

if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print("delete %s complete" % s3_objects_key)
break

else:
print(response['ResponseMetadata']['HTTPStatusCode'])
time.sleep(i)

else:
raise Exception('object %s delete failed' % s3_objects_key)

IAM Role

We created IAM Role as follows that base on AmazonAthenaFullAccess.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:*"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:DeleteDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:DeleteTable",
"glue:BatchDeleteTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:BatchCreatePartition",
"glue:CreatePartition",
"glue:DeletePartition",
"glue:BatchDeletePartition",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::your_athena_query_results_bucket",
"arn:aws:s3:::your_athena_query_results_bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::your_athena_bucket",
"arn:aws:s3:::your_athena_bucket/*"
]
}
]
}

Conclusion

We explained how to run Amazon Athena using AWS Lambda (Python) based on sample code.

References