This tutorial is a modification of Running a Data Analytics DAG in Google Cloud that shows how to connect your Cloud Composer environment to Amazon Web Services to utilize data stored there. It shows how to use Cloud Composer to create an Apache Airflow DAG (workflow). The DAG joins data from a BigQuery public dataset and a CSV file stored in an Amazon Web Services (AWS) S3 bucket and then runs a Dataproc Serverless batch job to process the joined data.
The BigQuery public dataset in this tutorial is ghcn_d, an integrated database of climate summaries across the globe. The CSV file contains information about the dates and names of US holidays from 1997 to 2021.
The question we want to answer using the DAG is: "How warm was it in Chicago on Thanksgiving for the past 25 years?"
Objectives
- Create a Cloud Composer environment in the default configuration
- Create a bucket in AWS S3
- Create an empty BigQuery dataset
- Create a new Cloud Storage bucket
- Create and run a DAG that includes the following tasks:
- Load an external dataset from S3 to Cloud Storage
- Load an external dataset from Cloud Storage to BigQuery
- Join two datasets in BigQuery
- Run a data analytics PySpark job
- Clean up the project
Before you begin
Manage permissions in AWS
- Create an AWS account
Follow the "Creating policies with the visual editor section" of the Creating IAM Policies AWS tutorial to create a customized IAM policy for AWS S3 with the following configuration:
- Service: S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
), for viewing your S3 bucket - CreateBucket (
s3:CreateBucket
), for creating a bucket - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
), for creating a bucket - ListBucket (
s3:ListBucket
), for granting permission to list objects in a S3 bucket - PutObject (
s3:PutObject
), for uploading files to a bucket - GetBucketVersioning (
s3:GetBucketVersioning
), for deleting an object in a bucket - DeleteObject (
s3:DeleteObject
), for deleting an object in a bucket - ListBucketVersions (
s3:ListBucketVersions
), for deleting a bucket - DeleteBucket (
s3:DeleteBucket
), for deleting a bucket - Resources: Choose "Any" next to "bucket" and "object" to grant permissions to any resources of that type.
- Tag: None
- Name: TutorialPolicy
Refer to the list of actions supported in Amazon S3 for more information about each configuration found above.
Manage permissions in Google Cloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Dataproc, Composer, BigQuery, Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Dataproc, Composer, BigQuery, Storage APIs.
- Grant the following roles to manage Cloud Composer environments and environment buckets
- Composer > Environment and Storage Object Administrator
- IAM > Service Account User
- Grant the following role to create a BigQuery dataset
- BigQuery > Data Owner
- Grant the following role to create a Cloud Storage bucket:
- Storage > Admin
Create and prepare your Cloud Composer environment
- Create a Cloud Composer environment:
- Choose a US based compute region
- Choose the latest Cloud Composer version
- Grant the following roles to the service account used in your Cloud Composer environment in order for the Airflow workers to successfully run DAG tasks
- BigQuery User
- BigQuery Data Owner
- Service Account User
- Dataproc Editor
- Dataproc Worker
Create and modify related resources in Google Cloud
- Install the
apache-airflow-providers-amazon
PyPI dependency in your Cloud Composer environment. Refer to the appropriate Airflow constraints file to check for compatibility if you want to specify a specific version. - Create an empty BigQuery dataset with the following parameters
- Name:
holiday_weather
- Region:
US
- Name:
- Create a new Cloud Storage bucket in the
US
multiregion Run the following command to enable private Google access on the default subnet in the region where you would like to run Dataproc Serverless to fulfill networking requirements. We recommend using the same region as your Cloud Composer environment.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Create related resources in AWS
- Create an S3 bucket with default settings in your preferred region.
Connect to AWS from Cloud Composer
- Get your AWS access key ID and secret access key
- Add your AWS S3 connection using the Airflow UI
- Go to Admin > Connections
- Create a new connection with the following configuration:
- Connection Id:
aws_s3_connection
- Connection Type:
Amazon S3
- Extras:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- Connection Id:
Running the DAG
Data processing using Dataproc Serverless
Explore the example PySpark Job
The code shown below is an example PySpark job that converts temperature from tenths of a degree in Celsius to degrees celsius. This job will convert temperature data from the dataset into a different format.
Upload the PySpark file to Cloud Storage
To upload the PySpark file and the dataset stored in holidays.csv
:
Save data_analytics_process.py to your local machine
In the Google Cloud console go to the Cloud Storage browser page:
Click the name of the bucket you created earlier
In the Objects tab for the bucket, click the Upload files button, select
data_analytics_process.py
in the dialog that appears, and click Open
Upload the CSV file to AWS S3
To upload the holidays.csv
file:
- Save
holidays.csv
on your local machine - Follow the AWS guide to upload the file to your bucket
Data analytics DAG
Explore the example workflow
The workflow uses multiple operators to transform and unify the data:
- The
S3ToGCSOperator
transfers the holidays.csv file from your AWS S3 bucket to your Cloud Storage bucket. - The
GCSToBigQueryOperator
ingests the holidays.csv file from Cloud Storage to a new table in the BigQueryholidays_weather
dataset you created earlier. - The
DataprocCreateBatchOperator
creates and runs a PySpark batch job using Dataproc Serverless. - The
BigQueryInsertJobOperator
joins the data from holidays.csv on the "Date" column with weather data from the BigQuery public dataset ghcn_d. TheBigQueryInsertJobOperator
tasks are dynamically generated using a for loop, and these tasks are in aTaskGroup
for better readability in the Graph View of the Airflow UI.
Use the Airflow UI to add variables
In Airflow, variables are an universal way to store and retrieve arbitrary settings or configurations as a simple key value store. This DAG uses Airflow variables to store common values. To add them to your environment:
- Access the Airflow UI from the Cloud Composer console
- Go to Admin > Variables
- Add the following variables:
s3_bucket
- the name of the s3 bucket you created earliergcp_project
- your project IDgcs_bucket
- the name of the bucket you created earlier without the "gs://" prefix.gce_region
- the region where you want your Dataproc job that meets Dataproc Serverless networking requirements. This is the region where you enabled private Google access earlier.dataproc_service_account
- the service account for your Cloud Composer environment. You can find this service account on the environment configuration tab for your Cloud Composer environment.
Upload the DAG to your environment's bucket
Cloud Composer schedules DAGs that are located in the /dags
folder in your environment's bucket. To upload the DAG using the Google Cloud console:
- On your local machine, save s3togcsoperator_tutorial.py
In Google Cloud console, go to the Environments page.
In the list of environments, In the DAG folder column click the DAGs link. The DAGs folder of your environment opens.
Click Upload files
Select
s3togcsoperator_tutorial.py
on your local machine and click Open
Trigger the DAG
- In your Cloud Composer environment, click the DAGs tab
- Click into DAG id
s3_to_gcs_dag
- Click Trigger DAG
- Wait about five to ten minutes until you see a green check indicating the tasks have been completed successfully.
Validate the DAG's success
In Google Cloud console, go to the BigQuery page.
In the Explorer panel, click your project name.
Click
holidays_weather_joined
.Click preview to view the resulting table. Note that the numbers in the value column are in tenths of a degree celsius.
Click
holidays_weather_normalized
.Click preview to view the resulting table. Note that the numbers in the value column are in degree celsius.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the project
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Delete individual resources
- Delete the
holidays.csv
file in your AWS S3 bucket - Delete the AWS S3 bucket that you created
- In the Google Cloud console, go to the Cloud Storage Browser page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
- Delete the BigQuery dataset
- Delete the Cloud Composer environment