Introduction

AWS Athena is a serverless query platform that makes it easy to query and analyze data in Amazon S3 using standard SQL.

In this article, we will look at how to use the Amazon Boto3 library to query structured data stored in AWS. We will be discussing the following steps in this tutorial:

  • Creating an S3 bucket
  • Storing structured data in S3
  • Creating a new database in Athena
  • Creating a new table in Athena
  • Querying a table in Athena
  • Accessing result of a query

Table of contents

Prerequisites

  • AWS CLI
  • Python3
  • Boto3: Boto3 can be installed using pip: pip install boto3
  • AWS Credentials: If you haven’t setup your AWS credentials before, this resource from AWS is helpful.

How to create an S3 bucket?

We will use the AWS CLI to create a new S3 bucket.

aws s3 mb s3://learnaws-athena-tutorial

Output:

make_bucket: learnaws-athena-tutorial

How to store structured data in S3 bucket?

We will store this dataset on funding data from Techcrunch for this tutorial. To upload this file to S3, use the following commands:

aws s3 cp athena_data.csv s3://learnaws-athena-tutorial/input/

Output:

upload: ./athena_data.csv to s3://learnaws-athena-tutorial/input/athena_data.csv

How to check the status of a query in Athena?

We will use the get_query_execution method to check the status of the query. The method takes the ExecutionId of the query as input.


def has_query_succeeded(execution_id):
    state = "RUNNING"
    max_execution = 5

    while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
        max_execution -= 1
        response = CLIENT.get_query_execution(QueryExecutionId=execution_id)
        if (
            "QueryExecution" in response
            and "Status" in response["QueryExecution"]
            and "State" in response["QueryExecution"]["Status"]
        ):
            state = response["QueryExecution"]["Status"]["State"]
            if state == "SUCCEEDED":
                return True

        time.sleep(30)

    return False

How to create a new database in Athena?

We will use the start_query_execution method to create a new database in Athena.


CLIENT = boto3.client("athena")

DATABASE_NAME = "athena_tutorial"
RESULT_OUTPUT_LOCATION = "s3://learnaws-athena-tutorial/queries/"

def create_database():
    response = CLIENT.start_query_execution(
        QueryString=f"create database {DATABASE_NAME}",
        ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

    return response["QueryExecutionId"]

We are creating a new database named athena_tutorial and storing the output of the query in s3://learnaws-athena-tutorial/queries/.

How to create a new table in Athena?

We will be creating a table called funding_data in Athena based on the schema of our CSV. To do so, we will create the following DDL and store it in a file name ‘funding_table.ddl`.

CREATE EXTERNAL TABLE IF NOT EXISTS
athena_tutorial.funding_data (
  Permalink string,
  Company string,
  NumEmps string,
  Category string,
  City string,
  State string,
  FundedDate string,
  RaisedAmt string,
  RaisedCurrency string,
  Round string
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  'separatorChar' = ',',
  'quoteChar' = '\"',
  'escapeChar' = '\\'
)
STORED AS TEXTFILE
LOCATION 's3://learnaws-athena-tutorial/input/';

Next, we will use this DDL file to initiate a query to create the table in Athena.

TABLE_DDL = "funding_data.ddl"

def create_table():
    with open(TABLE_DDL) as ddl:
        response = CLIENT.start_query_execution(
            QueryString=ddl.read(),
            ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
        )

        return response["QueryExecutionId"]

How to query a table in Athena?

We can use standard SQL to query the table. In the following example, we will retrieve the number of rows in our dataset:


def get_num_rows():
    query = f"SELECT COUNT(*) from {DATABASE_NAME}.{TABLE_NAME}"
    response = CLIENT.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

    return response["QueryExecutionId"]

How to access the results of a query?

The results of any query is stored in the output location provided during the query itself. We can access the results of the query as follows:

def get_query_results(execution_id):
    response = CLIENT.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response['ResultSet']['Rows']
    return results

Output:

[{'Data': [{'VarCharValue': '_col0'}]}, {'Data': [{'VarCharValue': '1461'}]}]

Putting it all together

The whole program put together would look something like this:


import boto3
import time

CLIENT = boto3.client("athena")

DATABASE_NAME = "athena_tutorial"
RESULT_OUTPUT_LOCATION = "s3://learnaws-athena-tutorial/queries/"
TABLE_DDL = "funding_data.ddl"
TABLE_NAME = "funding_data"


def has_query_succeeded(execution_id):
    state = "RUNNING"
    max_execution = 5

    while max_execution > 0 and state in ["RUNNING", "QUEUED"]:
        max_execution -= 1
        response = CLIENT.get_query_execution(QueryExecutionId=execution_id)
        if (
            "QueryExecution" in response
            and "Status" in response["QueryExecution"]
            and "State" in response["QueryExecution"]["Status"]
        ):
            state = response["QueryExecution"]["Status"]["State"]
            if state == "SUCCEEDED":
                return True

        time.sleep(30)

    return False


def create_database():
    response = CLIENT.start_query_execution(
        QueryString=f"create database {DATABASE_NAME}",
        ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

    return response["QueryExecutionId"]


def create_table():
    with open(TABLE_DDL) as ddl:
        response = CLIENT.start_query_execution(
            QueryString=ddl.read(),
            ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
        )

        return response["QueryExecutionId"]


def get_num_rows():
    query = f"SELECT COUNT(*) from {DATABASE_NAME}.{TABLE_NAME}"
    response = CLIENT.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": RESULT_OUTPUT_LOCATION}
    )

    return response["QueryExecutionId"]


def get_query_results(execution_id):
    response = CLIENT.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response['ResultSet']['Rows']
    return results


def main():
    # 1. Create Database
    execution_id = create_database()
    print(f"Checking query execution for: {execution_id}")

    # 2. Check query execution
    query_status = has_query_succeeded(execution_id=execution_id)
    print(f"Query state: {query_status}")

    # 3. Create Table
    execution_id = create_table()
    print(f"Create Table execution id: {execution_id}")

    # 4. Check query execution
    query_status = has_query_succeeded(execution_id=execution_id)
    print(f"Query state: {query_status}")

    # 5. Query Athena table
    execution_id = get_num_rows()
    print(f"Get Num Rows execution id: {execution_id}")

    query_status = has_query_succeeded(execution_id=execution_id)
    print(f"Query state: {query_status}")

    # 6. Query Results
    print(get_query_results(execution_id=execution_id))


if __name__ == "__main__":
    main()