How to build a data pipeline with AWS Boto3, Glue & Athena

Introduction
AWS Athena is a serverless query platform that makes it easy to query and analyze data in Amazon S3 using standard SQL. AWS Glue is a fully-managed ETL service. Athena integrates with AWS Glue Crawlers to automatically infer database and table schema from data stored in S3. The associated metadata is stored in AWS Glue Data Catalog,
In this article, we will look at how to use the Amazon Boto3 library to build a data pipeline. We will be discussing the following steps in this tutorial:
- Creating an S3 bucket and storing our dataset
- Creating an IAM role to support AWS Glue Crawler
- Creating a Glue Crawler
- Running the Glue Crawler
- Querying the database table using Athena
- Accessing the result of the query
Table of contents
- Introduction
- Prerequisites
- How to create an S3 bucket?
- How to store structured data in S3 bucket?
- How to create an IAM role?
- How to create a Glue crawler?
- How to run the Glue crawler?
- How to run a query in Athena?
- How to access the results of a query?
- Putting it all together
Prerequisites
- AWS CLI
- Python3
- Boto3: Boto3 can be installed using pip:
pip install boto3
- AWS Credentials: If you haven’t setup your AWS credentials before, this resource from AWS is helpful.
How to create an S3 bucket?
We will use the AWS CLI to create a new S3 bucket.
aws s3 mb s3://learnaws-glue-athena-tutorial
Output:
make_bucket: learnaws-glue-athena-tutorial
How to store structured data in S3 bucket?
We will store this dataset on funding data from Techcrunch for this tutorial. To upload this file to S3, use the following commands:
aws s3 cp athena_data.csv s3://learnaws-glue-athena-tutorial/input/
Output:
upload: ./athena_data.csv to s3://learnaws-athena-tutorial/input/athena_data.csv
How to create an IAM role?
Before we can create an AWS Glue Crawler, we need to create an IAM role and attach IAM policies to it with the correct permissions to run a Glue crawler job.
def create_iam_policy(s3_bucket_name):
# Create IAM client
iam = boto3.client("iam")
# Create a policy
glue_s3_crawler_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{s3_bucket_name}"]
},
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{s3_bucket_name}/*"]
}
]
}
response = iam.create_policy(
PolicyName='glueS3CrawlerPolicy',
PolicyDocument=json.dumps(glue_s3_crawler_policy)
)
return response["Policy"]["Arn"]
def create_iam_role():
iam = boto3.client("iam")
assume_role_policy_document = json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
})
response = iam.create_role(
RoleName = "glueS3CrawlerRole",
AssumeRolePolicyDocument = assume_role_policy_document
)
return response["Role"]["RoleName"]
def attach_iam_policy(policy_arn, role_name):
iam = boto3.client("iam")
response = iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_arn
)
print(response)
How to run:
s3_bucket_name = "learnaws-glue-athena-tutorial"
# 1. Create IAM Policy
print("Creating IAM policy")
policy_arn = create_iam_policy(s3_bucket_name=s3_bucket_name)
# 2. Create IAM Role
print("Creating IAM role")
role_name = create_iam_role()
# 3. Attach IAM policy
print("Attaching IAM policy")
attach_iam_policy(policy_arn=policy_arn, role_name=role_name)
# 4. Attach Glue Managed policy
service_policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
attach_iam_policy(policy_arn=service_policy_arn, role_name=role_name)
How to create a Glue crawler?
Next, we will create a Glue crawler that will populate the AWS Glue Data catalog with tables. We will be using the create_crawler
method from the Boto3 library to create the crawler. The Glue crawler will crawl the S3 bucket that we just created and then populate the table in the database name that we provide as part of the input.
def create_glue_crawler(crawler_name, iam_role_name, db_name, s3_path, s3_path_exclusions):
glue_client = boto3.client("glue")
response = glue_client.create_crawler(
Name=crawler_name,
Role=iam_role_name,
DatabaseName=db_name,
Targets={
"S3Targets": [
{
"Path": s3_path,
"Exclusions": s3_path_exclusions
}
]
}
)
print(response)
How to run:
crawler_name = "GlueTutorialCrawler"
glue_database_name = "GlueTutorialDB"
create_glue_crawler(
crawler_name=crawler_name,
iam_role_name=role_name, # from the IAM step
db_name=glue_database_name,
s3_path=f"s3://{s3_bucket_name}/input", # s3 bucket
s3_path_exclusions=[],
)
Once the crawler has been created successfully, you should be able to see it in the AWS console:
How to run the Glue crawler?
After creating the Glue crawler, we will run it so that we can populate our table with the data in the S3 bucket. We will use the start_crawler
method to start the crawler.
def start_crawler(crawler_name):
glue_client = boto3.client("glue")
response = glue_client.start_crawler(
Name=crawler_name
)
print(response)
If we check the AWS console after running the crawler, we should be able to see that our table was created.
How to run a query in Athena?
We can use standard SQL to query the table. In the following example, we will retrieve the number of rows in our dataset:
def get_num_rows(database_name, table_name, output_location):
athena_client = boto3.client("athena")
query = f"SELECT COUNT(*) from {database_name}.{table_name}"
response = athena_client.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation": output_location}
)
return response["QueryExecutionId"]
How to access the results of a query?
The results of any query is stored in the output location provided during the query itself. We can access the results of the query as follows:
def get_query_results(execution_id):
athena_client = boto3.client("athena")
response = athena_client.get_query_results(
QueryExecutionId=execution_id
)
results = response["ResultSet"]["Rows"]
return results
Output:
[{'Data': [{'VarCharValue': '_col0'}]}, {'Data': [{'VarCharValue': '1461'}]}]
Putting it all together
The whole program put together would look something like this:
import boto3
import json
def create_iam_policy(s3_bucket_name):
# Create IAM client
iam = boto3.client("iam")
# Create a policy
glue_s3_crawler_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{s3_bucket_name}"]
},
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{s3_bucket_name}/*"]
}
]
}
response = iam.create_policy(
PolicyName='glueS3CrawlerPolicy',
PolicyDocument=json.dumps(glue_s3_crawler_policy)
)
return response["Policy"]["Arn"]
def create_iam_role():
iam = boto3.client("iam")
assume_role_policy_document = json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
})
response = iam.create_role(
RoleName = "glueS3CrawlerRole",
AssumeRolePolicyDocument = assume_role_policy_document
)
return response["Role"]["RoleName"]
def attach_iam_policy(policy_arn, role_name):
iam = boto3.client("iam")
response = iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_arn
)
print(response)
def create_glue_crawler(crawler_name, iam_role_name, db_name, s3_path, s3_path_exclusions):
glue_client = boto3.client("glue")
response = glue_client.create_crawler(
Name=crawler_name,
Role=iam_role_name,
DatabaseName=db_name,
Targets={
"S3Targets": [
{
"Path": s3_path,
"Exclusions": s3_path_exclusions
}
]
}
)
print(response)
def start_crawler(crawler_name):
glue_client = boto3.client("glue")
response = glue_client.start_crawler(
Name=crawler_name
)
print(response)
def get_num_rows(database_name, table_name, output_location):
athena_client = boto3.client("athena")
query = f"SELECT COUNT(*) from {database_name}.{table_name}"
response = athena_client.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation": output_location}
)
return response["QueryExecutionId"]
def get_query_results(execution_id):
athena_client = boto3.client("athena")
response = athena_client.get_query_results(
QueryExecutionId=execution_id
)
results = response["ResultSet"]["Rows"]
return results
def main():
s3_bucket_name = "learnaws-glue-athena-tutorial"
# 1. Create IAM Policy
print("Creating IAM policy")
policy_arn = create_iam_policy(s3_bucket_name=s3_bucket_name)
# 2. Create IAM Role
print("Creating IAM role")
role_name = create_iam_role()
# 3. Attach IAM policy
print("Attaching IAM policy")
attach_iam_policy(policy_arn=policy_arn, role_name=role_name)
# 4. Attach AWS Managed Role
service_role_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
attach_iam_policy(policy_arn=policy_arn, role_name=service_role_arn)
# 5. Create Glue Crawler for path
print("Creating Glue crawler")
crawler_name = "GlueTutorialCrawler"
glue_database_name = "GlueTutorialDB"
create_glue_crawler(
crawler_name=crawler_name,
iam_role_name="glueS3CrawlerRole",
db_name=glue_database_name,
s3_path=f"s3://{s3_bucket_name}/input",
s3_path_exclusions=[],
)
# 6. Start crawler
print("Starting Glue crawler")
start_crawler(crawler_name=crawler_name)
# 7. Make athena query
database_name = "gluetutorialdb"
table_name = "input"
output_location ="s3://learnaws-glue-athena-tutorial/queries/"
print("Querying athena:")
execution_id = get_num_rows(database_name=database_name, table_name=table_name, output_location=output_location)
# 8. Retrieve results
print("retrieving results")
print(get_query_results(execution_id=execution_id))
if __name__ == "__main__":
main()