AWS Athena, Boto3 and Python: Complete Guide with examples
Introduction
AWS Athena is a serverless query platform that makes it easy to query and analyze data in Amazon S3 using standard SQL.
In this article, we will look at how to use the Amazon Boto3 library to query structured data stored in AWS. We will be discussing the following steps in this tutorial:
- Creating an S3 bucket
- Storing structured data in S3
- Creating a new database in Athena
- Creating a new table in Athena
- Querying a table in Athena
- Accessing result of a query
Table of contents
- Introduction
- Prerequisites
- How to create an S3 bucket?
- How to store structured data in S3 bucket?
- How to check the status of a query in Athena?
- How to create a new database in Athena?
- How to create a new table in Athena?
- How to query a table in Athena?
- How to access the results of a query?
- Putting it all together
Prerequisites
- AWS CLI
- Python3
- Boto3: Boto3 can be installed using pip:
pip install boto3
- AWS Credentials: If you haven’t setup your AWS credentials before, this resource from AWS is helpful.
How to create an S3 bucket?
We will use the AWS CLI to create a new S3 bucket.
aws s3 mb s3://learnaws-athena-tutorial
Output:
make_bucket: learnaws-athena-tutorial
How to store structured data in S3 bucket?
We will store this dataset on funding data from Techcrunch for this tutorial. To upload this file to S3, use the following commands:
aws s3 cp athena_data.csv s3://learnaws-athena-tutorial/input/
Output:
upload: ./athena_data.csv to s3://learnaws-athena-tutorial/input/athena_data.csv
How to check the status of a query in Athena?
We will use the get_query_execution
method to check the status of the query. The method takes the ExecutionId
of the query as input.
def has_query_succeeded(execution_id):
state = "RUNNING"
max_execution = 5
while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
max_execution -= 1
response = CLIENT.get_query_execution(QueryExecutionId=execution_id)
if (
"QueryExecution" in response
and "Status" in response["QueryExecution"]
and "State" in response["QueryExecution"]["Status"]
):
state = response["QueryExecution"]["Status"]["State"]
if state == "SUCCEEDED":
return True
time.sleep(30)
return False
How to create a new database in Athena?
We will use the start_query_execution
method to create a new database in Athena.
CLIENT = boto3.client("athena")
DATABASE_NAME = "athena_tutorial"
RESULT_OUTPUT_LOCATION = "s3://learnaws-athena-tutorial/queries/"
def create_database():
response = CLIENT.start_query_execution(
QueryString=f"create database {DATABASE_NAME}",
ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
)
return response["QueryExecutionId"]
We are creating a new database named athena_tutorial
and storing the output of the query in s3://learnaws-athena-tutorial/queries/
.
How to create a new table in Athena?
We will be creating a table called funding_data
in Athena based on the schema of our CSV. To do so, we will create the following DDL and store it in a file name ‘funding_table.ddl`.
CREATE EXTERNAL TABLE IF NOT EXISTS
athena_tutorial.funding_data (
Permalink string,
Company string,
NumEmps string,
Category string,
City string,
State string,
FundedDate string,
RaisedAmt string,
RaisedCurrency string,
Round string
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '\"',
'escapeChar' = '\\'
)
STORED AS TEXTFILE
LOCATION 's3://learnaws-athena-tutorial/input/';
Next, we will use this DDL file to initiate a query to create the table in Athena.
TABLE_DDL = "funding_data.ddl"
def create_table():
with open(TABLE_DDL) as ddl:
response = CLIENT.start_query_execution(
QueryString=ddl.read(),
ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
)
return response["QueryExecutionId"]
How to query a table in Athena?
We can use standard SQL to query the table. In the following example, we will retrieve the number of rows in our dataset:
def get_num_rows():
query = f"SELECT COUNT(*) from {DATABASE_NAME}.{TABLE_NAME}"
response = CLIENT.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
)
return response["QueryExecutionId"]
How to access the results of a query?
The results of any query is stored in the output location provided during the query itself. We can access the results of the query as follows:
def get_query_results(execution_id):
response = CLIENT.get_query_results(
QueryExecutionId=execution_id
)
results = response['ResultSet']['Rows']
return results
Output:
[{'Data': [{'VarCharValue': '_col0'}]}, {'Data': [{'VarCharValue': '1461'}]}]
Putting it all together
The whole program put together would look something like this:
import boto3
import time
CLIENT = boto3.client("athena")
DATABASE_NAME = "athena_tutorial"
RESULT_OUTPUT_LOCATION = "s3://learnaws-athena-tutorial/queries/"
TABLE_DDL = "funding_data.ddl"
TABLE_NAME = "funding_data"
def has_query_succeeded(execution_id):
state = "RUNNING"
max_execution = 5
while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
max_execution -= 1
response = CLIENT.get_query_execution(QueryExecutionId=execution_id)
if (
"QueryExecution" in response
and "Status" in response["QueryExecution"]
and "State" in response["QueryExecution"]["Status"]
):
state = response["QueryExecution"]["Status"]["State"]
if state == "SUCCEEDED":
return True
time.sleep(30)
return False
def create_database():
response = CLIENT.start_query_execution(
QueryString=f"create database {DATABASE_NAME}",
ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
)
return response["QueryExecutionId"]
def create_table():
with open(TABLE_DDL) as ddl:
response = CLIENT.start_query_execution(
QueryString=ddl.read(),
ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
)
return response["QueryExecutionId"]
def get_num_rows():
query = f"SELECT COUNT(*) from {DATABASE_NAME}.{TABLE_NAME}"
response = CLIENT.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
)
return response["QueryExecutionId"]
def get_query_results(execution_id):
response = CLIENT.get_query_results(
QueryExecutionId=execution_id
)
results = response['ResultSet']['Rows']
return results
def main():
# 1. Create Database
execution_id = create_database()
print(f"Checking query execution for: {execution_id}")
# 2. Check query execution
query_status = has_query_succeeded(execution_id=execution_id)
print(f"Query state: {query_status}")
# 3. Create Table
execution_id = create_table()
print(f"Create Table execution id: {execution_id}")
# 4. Check query execution
query_status = has_query_succeeded(execution_id=execution_id)
print(f"Query state: {query_status}")
# 5. Query Athena table
execution_id = get_num_rows()
print(f"Get Num Rows execution id: {execution_id}")
query_status = has_query_succeeded(execution_id=execution_id)
print(f"Query state: {query_status}")
# 6. Query Results
print(get_query_results(execution_id=execution_id))
if __name__ == "__main__":
main()