Introduction

AWS Athena is a serverless query platform that makes it easy to query and analyze data in Amazon S3 using standard SQL. AWS Glue is a fully-managed ETL service. Athena integrates with AWS Glue Crawlers to automatically infer database and table schema from data stored in S3. The associated metadata is stored in AWS Glue Data Catalog,

In this article, we will look at how to use the Amazon Boto3 library to build a data pipeline. We will be discussing the following steps in this tutorial:

  • Creating an S3 bucket and storing our dataset
  • Creating an IAM role to support AWS Glue Crawler
  • Creating a Glue Crawler
  • Running the Glue Crawler
  • Querying the database table using Athena
  • Accessing the result of the query

Table of contents

Prerequisites

  • AWS CLI
  • Python3
  • Boto3: Boto3 can be installed using pip: pip install boto3
  • AWS Credentials: If you haven’t setup your AWS credentials before, this resource from AWS is helpful.

How to create an S3 bucket?

We will use the AWS CLI to create a new S3 bucket.

aws s3 mb s3://learnaws-glue-athena-tutorial

Output:

make_bucket: learnaws-glue-athena-tutorial

How to store structured data in S3 bucket?

We will store this dataset on funding data from Techcrunch for this tutorial. To upload this file to S3, use the following commands:

aws s3 cp athena_data.csv s3://learnaws-glue-athena-tutorial/input/

Output:

upload: ./athena_data.csv to s3://learnaws-athena-tutorial/input/athena_data.csv

How to create an IAM role?

Before we can create an AWS Glue Crawler, we need to create an IAM role and attach IAM policies to it with the correct permissions to run a Glue crawler job.


def create_iam_policy(s3_bucket_name):
    # Create IAM client
    iam = boto3.client("iam")

    # Create a policy
    glue_s3_crawler_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": ["s3:ListBucket"],
                "Resource": [f"arn:aws:s3:::{s3_bucket_name}"]
            },
            {
                "Effect": "Allow",
                "Action": ["s3:GetObject"],
                "Resource": [f"arn:aws:s3:::{s3_bucket_name}/*"]
            }
        ]
    }

    response = iam.create_policy(
        PolicyName='glueS3CrawlerPolicy',
        PolicyDocument=json.dumps(glue_s3_crawler_policy)
    )

    return response["Policy"]["Arn"]


def create_iam_role():
    iam = boto3.client("iam")

    assume_role_policy_document = json.dumps({
        "Version": "2012-10-17",
        "Statement": [
            {
            "Effect": "Allow",
            "Principal": {
                "Service": "glue.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
            }
        ]
    })

    response = iam.create_role(
        RoleName = "glueS3CrawlerRole",
        AssumeRolePolicyDocument = assume_role_policy_document
    )

    return response["Role"]["RoleName"]


def attach_iam_policy(policy_arn, role_name):
    iam = boto3.client("iam")

    response = iam.attach_role_policy(
        RoleName=role_name,
        PolicyArn=policy_arn
    )

    print(response)

How to run:

    s3_bucket_name = "learnaws-glue-athena-tutorial"

    # 1. Create IAM Policy
    print("Creating IAM policy")
    policy_arn = create_iam_policy(s3_bucket_name=s3_bucket_name)

    # 2. Create IAM Role
    print("Creating IAM role")
    role_name = create_iam_role()

    # 3. Attach IAM policy
    print("Attaching IAM policy")
    attach_iam_policy(policy_arn=policy_arn, role_name=role_name)

    # 4. Attach Glue Managed policy
    service_policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
    attach_iam_policy(policy_arn=service_policy_arn, role_name=role_name)

How to create a Glue crawler?

Next, we will create a Glue crawler that will populate the AWS Glue Data catalog with tables. We will be using the create_crawler method from the Boto3 library to create the crawler. The Glue crawler will crawl the S3 bucket that we just created and then populate the table in the database name that we provide as part of the input.


def create_glue_crawler(crawler_name, iam_role_name, db_name, s3_path, s3_path_exclusions):
    glue_client = boto3.client("glue")

    response = glue_client.create_crawler(
        Name=crawler_name,
        Role=iam_role_name,
        DatabaseName=db_name,
        Targets={
            "S3Targets": [
                {
                    "Path": s3_path,
                    "Exclusions": s3_path_exclusions
                }
            ]
        }
    )

    print(response)

How to run:


crawler_name = "GlueTutorialCrawler"
glue_database_name = "GlueTutorialDB"

create_glue_crawler(
    crawler_name=crawler_name,
    iam_role_name=role_name, # from the IAM step
    db_name=glue_database_name,
    s3_path=f"s3://{s3_bucket_name}/input", # s3 bucket
    s3_path_exclusions=[],
)

Once the crawler has been created successfully, you should be able to see it in the AWS console:

AWS Glue crawler

How to run the Glue crawler?

After creating the Glue crawler, we will run it so that we can populate our table with the data in the S3 bucket. We will use the start_crawler method to start the crawler.


def start_crawler(crawler_name):
    glue_client = boto3.client("glue")

    response = glue_client.start_crawler(
        Name=crawler_name
    )

    print(response)

If we check the AWS console after running the crawler, we should be able to see that our table was created.

AWS Glue crawler

How to run a query in Athena?

We can use standard SQL to query the table. In the following example, we will retrieve the number of rows in our dataset:


def get_num_rows(database_name, table_name, output_location):
    athena_client = boto3.client("athena")

    query = f"SELECT COUNT(*) from {database_name}.{table_name}"
    response = athena_client.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": output_location}
    )

    return response["QueryExecutionId"]


How to access the results of a query?

The results of any query is stored in the output location provided during the query itself. We can access the results of the query as follows:


def get_query_results(execution_id):
    athena_client = boto3.client("athena")

    response = athena_client.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response["ResultSet"]["Rows"]
    return results

Output:

[{'Data': [{'VarCharValue': '_col0'}]}, {'Data': [{'VarCharValue': '1461'}]}]

Putting it all together

The whole program put together would look something like this:


import boto3
import json

def create_iam_policy(s3_bucket_name):
    # Create IAM client
    iam = boto3.client("iam")

    # Create a policy
    glue_s3_crawler_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": ["s3:ListBucket"],
                "Resource": [f"arn:aws:s3:::{s3_bucket_name}"]
            },
            {
                "Effect": "Allow",
                "Action": ["s3:GetObject"],
                "Resource": [f"arn:aws:s3:::{s3_bucket_name}/*"]
            }
        ]
    }

    response = iam.create_policy(
        PolicyName='glueS3CrawlerPolicy',
        PolicyDocument=json.dumps(glue_s3_crawler_policy)
    )

    return response["Policy"]["Arn"]


def create_iam_role():
    iam = boto3.client("iam")

    assume_role_policy_document = json.dumps({
        "Version": "2012-10-17",
        "Statement": [
            {
            "Effect": "Allow",
            "Principal": {
                "Service": "glue.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
            }
        ]
    })

    response = iam.create_role(
        RoleName = "glueS3CrawlerRole",
        AssumeRolePolicyDocument = assume_role_policy_document
    )

    return response["Role"]["RoleName"]


def attach_iam_policy(policy_arn, role_name):
    iam = boto3.client("iam")

    response = iam.attach_role_policy(
        RoleName=role_name,
        PolicyArn=policy_arn
    )

    print(response)


def create_glue_crawler(crawler_name, iam_role_name, db_name, s3_path, s3_path_exclusions):
    glue_client = boto3.client("glue")

    response = glue_client.create_crawler(
        Name=crawler_name,
        Role=iam_role_name,
        DatabaseName=db_name,
        Targets={
            "S3Targets": [
                {
                    "Path": s3_path,
                    "Exclusions": s3_path_exclusions
                }
            ]
        }
    )

    print(response)


def start_crawler(crawler_name):
    glue_client = boto3.client("glue")

    response = glue_client.start_crawler(
        Name=crawler_name
    )

    print(response)



def get_num_rows(database_name, table_name, output_location):
    athena_client = boto3.client("athena")

    query = f"SELECT COUNT(*) from {database_name}.{table_name}"
    response = athena_client.start_query_execution(
        QueryString=query,
        ResultConfiguration={"OutputLocation": output_location}
    )

    return response["QueryExecutionId"]


def get_query_results(execution_id):
    athena_client = boto3.client("athena")

    response = athena_client.get_query_results(
        QueryExecutionId=execution_id
    )

    results = response["ResultSet"]["Rows"]
    return results


def main():
    s3_bucket_name = "learnaws-glue-athena-tutorial"

    # 1. Create IAM Policy
    print("Creating IAM policy")
    policy_arn = create_iam_policy(s3_bucket_name=s3_bucket_name)

    # 2. Create IAM Role
    print("Creating IAM role")
    role_name = create_iam_role()

    # 3. Attach IAM policy
    print("Attaching IAM policy")
    attach_iam_policy(policy_arn=policy_arn, role_name=role_name)

    # 4. Attach AWS Managed Role
    service_role_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
    attach_iam_policy(policy_arn=policy_arn, role_name=service_role_arn)


    # 5. Create Glue Crawler for path
    print("Creating Glue crawler")
    crawler_name = "GlueTutorialCrawler"
    glue_database_name = "GlueTutorialDB"

    create_glue_crawler(
        crawler_name=crawler_name,
        iam_role_name="glueS3CrawlerRole",
        db_name=glue_database_name,
        s3_path=f"s3://{s3_bucket_name}/input",
        s3_path_exclusions=[],
    )

    # 6. Start crawler
    print("Starting Glue crawler")
    start_crawler(crawler_name=crawler_name)

    # 7. Make athena query
    database_name = "gluetutorialdb"
    table_name = "input"
    output_location ="s3://learnaws-glue-athena-tutorial/queries/"

    print("Querying athena:")
    execution_id = get_num_rows(database_name=database_name, table_name=table_name, output_location=output_location)

    # 8. Retrieve results
    print("retrieving results")
    print(get_query_results(execution_id=execution_id))


if __name__ == "__main__":
    main()