この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
We introduce how to Amazon Athena using AWS Lambda(Python3.6).
Overview
- Receive key data when an Event published and AWS lambda is executed.
- Run query at Amazon Athena and get the result from execution.
- Delete s3 objects created at the time of Athena execution.
s3 data
S3 data sample is as follows:
{"name":"user1","email":"user1@example.com"}
{"name":"user2","email":"user2@example.com"}
{"name":"user3","email":"user3@example.com"}
AWS Lambda
Sample Code
import time
import boto3
# athena constant
DATABASE = 'your_athena_database_name'
TABLE = 'your_athena_table_name'
# S3 constant
S3_OUTPUT = 's3://your_athena_query_output_backet_name'
S3_BUCKET = 'your_athena_query_output_backet_name'
# number of retries
RETRY_COUNT = 10
# query constant
COLUMN = 'your_column_name'
def lambda_handler(event, context):
# get keyword
keyword = event['name']
# created query
query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
# athena client
client = boto3.client('athena')
# Execution
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
# get query execution id
query_execution_id = response['QueryExecutionId']
print(query_execution_id)
# get execution status
for i in range(1, 1 + RETRY_COUNT):
# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
print(result)
# get data
if len(result['ResultSet']['Rows']) == 2:
email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']
return email
else:
return None
Get name
of data when Lambda function gets invoked, run the query on Athena and return the response.
- Get
name
data
# get keyword
keyword = event['name']
- Create query
# create query
query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
- Run query with Athena
# Execution
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DATABASE
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
- Get execution id
# get query execution id
query_execution_id = response['QueryExecutionId']
- Check the status of QueryExecution
Assuming that the execution time does not take so much time, we execute using a for
loop instead of while
loop, with a specified retry count.
# get execution status
for i in range(1, 1 + RETRY_COUNT):
# get query execution
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
print("STATUS:" + query_execution_status)
break
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
- Get Response
# get query results
result = client.get_query_results(QueryExecutionId=query_execution_id)
- Example Response
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'name'}, {'VarCharValue': 'email'}]}, {'Data': [{'VarCharValue': 'user1'}, {'VarCharValue': 'user1@example.com'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'name', 'Label': 'name', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}, {'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'email', 'Label': 'email', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}]}}, 'ResponseMetadata': {'RequestId': 'e0c1ec4c-09a1-11e8-97a4-************', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 04 Feb 2018 11:52:30 GMT', 'x-amzn-requestid': 'e0c1ec4c-09a1-11e8-97a4-************', 'content-length': '1026', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
We get email address data from the response in the following example code:
# get data
if len(result['ResultSet']['Rows']) == 2:
email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue']
return email
else:
return None
Execution
- Execution test event
{
"name": "user1"
}
- Execution Result
We successfully obtained the email address.
Created file in the S3 bucket
When Athena is executed, the following files are created in the S3 bucket of the output destination. We recommend that delete it regularly using the S3 Bucket's Lifecycle Policy.
- file example e0c1ec4c-09a1-11e8-97a4-************.csv e0c1ec4c-09a1-11e8-97a4-************.csv.metadata
OR if you want to delete the files each time, you might delete it in code as follows.
# If you want to delete output file
# s3 client
client = boto3.client('s3')
# created s3 object
s3_objects_key = []
s3_object_key_csv = query_execution_id + '.csv'
s3_objects_key.append({'Key': s3_object_key_csv})
s3_object_key_metadata = query_execution_id + '.csv.metadata'
s3_objects_key.append({'Key': s3_object_key_metadata})
# delete s3 object
for i in range(1, 1 + RETRY_COUNT):
response = client.delete_objects(
Bucket=S3_BUCKET,
Delete={
'Objects': s3_objects_key
}
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print("delete %s complete" % s3_objects_key)
break
else:
print(response['ResponseMetadata']['HTTPStatusCode'])
time.sleep(i)
else:
raise Exception('object %s delete failed' % s3_objects_key)
IAM Role
We created IAM Role as follows that base on AmazonAthenaFullAccess
.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:*"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:DeleteDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:DeleteTable",
"glue:BatchDeleteTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:BatchCreatePartition",
"glue:CreatePartition",
"glue:DeletePartition",
"glue:BatchDeletePartition",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::your_athena_query_results_bucket",
"arn:aws:s3:::your_athena_query_results_bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::your_athena_bucket",
"arn:aws:s3:::your_athena_bucket/*"
]
}
]
}
Conclusion
We explained how to run Amazon Athena using AWS Lambda (Python) based on sample code.