Serverless orchestration: Loading data from Cloud Storage to BigQuery using Workflows

This tutorial shows how to reliably run serverless workflows using Workflows, Cloud Functions, and Firestore to load raw data, such as event logs, from Cloud Storage to BigQuery. Analytics platforms commonly have an orchestration tool to periodically load data in BigQuery using BigQuery jobs, and then transform the data to provide business metrics using SQL statements and BigQuery scripting. This tutorial is intended for developers and architects, who want to build serverless event-driven data processing pipelines. The tutorial assumes that you are familiar with YAML, SQL, and Python.

Architecture

The following diagram shows the high-level architecture of a serverless extract, load, and transform (ELT) pipeline using Workflows.

Extract, load, and transform pipeline.

In the preceding diagram, consider a retail platform that periodically collects sales events as files from various stores, and then writes the files to a Cloud Storage bucket. The events are used to provide business metrics by importing and processing in BigQuery. This architecture provides a reliable and serverless orchestration system to import your files into BigQuery, and is divided into the following two modules:

  • File list: Maintains the list of unprocessed files added to a Cloud Storage bucket in a Firestore collection. This module works through a Cloud Function that is triggered by an Object Finalize storage event, which is generated when a new file is added to the Cloud Storage bucket. The filename is appended to the files array of the collection named new in Firestore.
  • Workflow: Runs the scheduled workflows. Cloud Scheduler triggers a workflow that runs a series of steps according to a YAML-based syntax to orchestrate loading, and then transforming the data in BigQuery by calling Cloud Functions. The steps in the workflow call Cloud Functions to run the following tasks:

    • Create and start a BigQuery load job.
    • Poll the load job status.
    • Create and start the transform query job.
    • Poll the transform job status.

Using transactions to maintain the list of new files in Firestore helps ensure that no file is missed when a workflow imports them into BigQuery. Separate runs of the workflow are made idempotent by storing job metadata and status in Firestore.

Objectives

  • Create a Firestore database.
  • Set up a Cloud Function trigger to track files added to the Cloud Storage bucket in Firestore.
  • Deploy Cloud Functions to run and monitor BigQuery jobs.
  • Deploy and run a workflow to automate the process.

Costs

This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Cleaning up.

Before you begin

  1. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  3. Enable the Cloud Bigtable, Cloud Bigtable Admin, GKE, and Compute Engine APIs.

    Enable the APIs

  4. Go to the Dashboard page and make a note of the Project ID because it's used in a later step.

    Go to Dashboard

  5. In the Cloud Console, activate Cloud Shell.

    Activate Cloud Shell

Preparing your environment

  1. To create a Firestore database, do the following:

    1. In the Cloud Console, go to the Firestore page.

      Go to Firestore

    2. Click Select native mode.

    3. In the Select a location menu, select the region where you want to host the Firestore database. We recommend picking a region that is close to your physical location.

    4. Click Create database.

  2. In Cloud Shell, clone the source repository:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-workflows-load
    cd bigquery-workflows-load
    
  3. In Cloud Shell, create the following resources using Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Replace the following:

    • PROJECT_ID: your Google Cloud project ID
    • REGION: a specific Google Cloud geographical location to host your resources—for example, us-central1
    • ZONE: a location within a region to host your resources—for example, us-central1-b

    Terraform can help you create, change, and upgrade infrastructure at scale safely and predictably. The following resources are created in your project: - Service accounts with the required privileges to ensure secure access to your resources. - A BigQuery dataset named serverless_elt_dataset and a table named word_count to load the incoming files. - A Cloud Storage bucket named ${project_id}-ordersbucket for staging input files. - The following five Cloud Functions: - file_add_handler adds the name of the files that are added to the Cloud Storage bucket to the Firestore collection. - create_job creates a new BigQuery load job and associates files in the Firebase collection with the job. - create_query creates a new BigQuery query job. - poll_bigquery_job gets the status of a BigQuery job. - run_bigquery_job starts a BigQuery job.

  4. Get the URLs for the create_job, create_query, poll_bigquery_job, and run_bigquery_job Cloud Functions that you deployed in the previous step. Repeat this step for each function.

    gcloud functions describe FUNCTION_NAME | grep url
    

    Replace the following:

    • FUNCTION_NAME: the Cloud Function name, such as create_job.

    The output is similar to the following:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/FUNCTION_NAME
    

    Make a note of these URLs because they are used in the following step.

  5. Edit the workflow file:

    • In Cloud Shell, click Open Editor.
    • To open the file in the editor, go to bigquery-workflows-load and then click workflow.yaml.
    • Edit the following values in the section named constants:

      • create_job_url: CREATE_JOB_URL
      • poll_bigquery_job_url: POLL_BIGQUERY_JOB_URL
      • run_bigquery_job_url: RUN_BIGQUERY_JOB_URL
      • region: BIGQUERY_REGION
      • table_name: PROJECT_ID.serverless_elt_dataset.word_count

      Replace the following with the URLs from the previous step:

      • CREATE_JOB_URL: the URL of the function to create a new job
      • POLL_BIGQUERY_JOB_URL: the URL of the function to poll the status of a running job
      • RUN_BIGQUERY_JOB_URL: the URL of the function to start a BigQuery load job
      • CREATE_QUERY_URL: the URL of the function to start a BigQuery query job
      • BIGQUERY_REGION: the BigQuery region where data is stored—for example, US
  6. Deploy the workflow file:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Replace the following:

    • WORKFLOW_NAME: the unique name of the workflow
    • WORKFLOW_REGION: the region in which the workflow is deployed—for example, us-central1
    • WORKFLOW_DESCRIPTION: the description of the workflow
  7. Create a Python 3 virtual environment and install requirements for the file generator:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Generating files to import

The gen.py Python script generates random content in Avro format. The schema is the same as the BigQuery word_count table. These Avro files are copied to the specified Cloud Storage bucket.

  • In Cloud Shell, generate the files:

    python gen.py -p PROJECT_ID \
        -o PROJECT_ID-ordersbucket \
        -n RECORDS_PER_FILE \
        -f NUM_FILES \
        -x FILE_PREFIX
    

    Replace the following:

    • RECORDS_PER_FILE: the number of records in a single file
    • NUM_FILES: the total number of files to be uploaded
    • FILE_PREFIX: the prefix for the names of the generated files

View file entries in Firestore

When the files are copied to Cloud Storage, the handle_new_file Cloud Function is triggered. This function adds the file list to the file list array in the new document in the Firestore jobs collection.

  • To view the file list, in the Cloud Console, go to the Data page.

    Go to Data

    List of files added to the collection.

Trigger the workflow

Workflows links a series of serverless tasks together from Google Cloud and API services. Individual steps in this workflow run as Cloud Functions and the state is stored in Firestore. All calls to Cloud Functions are authenticated by using the service account of the workflow.

  • In Cloud Shell, run the workflow:

    gcloud workflows execute WORKFLOW_NAME
    

The following diagram shows the steps used in the workflow:

Steps used in the main and sub workflow.

The workflow is split into two parts: the main workflow and the sub workflow. The main workflow handles job creation and conditional execution while the sub workflow executes a BigQuery job. The workflow performs the following operations:

  • The create_job Cloud Function creates a new job object, gets the list of files added to Cloud Storage from the Firestore document, and associates the files with the load job. If there are no files to load, the function doesn't create a new job.
  • The create_query Cloud Function takes the query that needs to be executed along with the BigQuery region that the query should be executed in. The function creates the job in Firestore and returns the job ID.
  • The run_bigquery_job Cloud Function gets the ID of the job that needs to be executed, and then calls the BigQuery API to submit the job.
  • Instead of waiting for the job to complete in the Cloud Function, you can periodically poll the status of the job.
    • The poll_bigquery_job Cloud Function provides the status of the job. It is called repeatedly until the job completes.
    • To add a delay between calls to the poll_bigquery_job Cloud Function, a sleep routine is called from Workflows.

View the job status

  1. To view the file list and the status of the job, in the Cloud Console, go to the Data page.

    Go to Data

  2. A unique identifier (UUID) is generated for each job. To view the job_type and job_status, click the job ID. Each job might have one of the following types and statuses:

  • job_type: The type of job that is being run by the workflow with one of the following values:

    • 0: Load data into BigQuery.
    • 1: Run a query in BigQuery.
  • job_status: The current state of the job with one of the following values:

    • 0: The job has been created, but not started.
    • 1: The job is running.
    • 2: The job completed its execution successfully.
    • 3: There was an error and the job did not complete successfully.

The job object also contains metadata attributes such as the region of the BigQuery dataset, the name of the BigQuery table, and if it's a query job, the query string being run.

List of files with job status highlighted.

View data in BigQuery

To confirm that the ELT job was successful, verify that the data appears in the table:

  1. In the Cloud Console, go to the BigQuery Editor page.

    Go to Editor

  2. Click the serverless_elt_dataset.word_count table.

  3. Click the Preview tab.

    Preview tab showing data in table.

Scheduling

To periodically run the workflow on a schedule, you can use Cloud Scheduler.

Clean up

The easiest way to eliminate billing is to delete the Cloud project that you created for the tutorial. Alternatively, you can delete the individual resources.

Delete the individual resources

  1. In Cloud Shell, remove all resources created using Terraform:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. In the Cloud Console, go to the Data page.

    Go to Data

  3. Next to Jobs, click Menu and select Delete.

    Menu path to delete a collection.

Delete the project

  1. In the Cloud Console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next