Developers & Practitioners

Orchestrate Data Pipelines using Workflows

whero

When working on data engineering problems, the ability to manage execution control, access operational information such as historical runs & logs, and restart jobs from the point of failure are critical aspects of the overall pipeline development. One of the most common ways to achieve this in Google Cloud is using Cloud Composer (based on Apache Airflow).

If you’re looking for a serverless alternative, you can use Workflows to create serverless work flows that link a series of tasks together in the order you define. In this blog we’ll use Workflows to orchestrate a Dataflow pipeline in GCP. Let’s get started!

What are Workflows?

A workflow is made up of a series of steps described using the Workflows syntax, which can be written in either the YAML or JSON format. This is the workflow's definition. For a detailed explanation of the Workflows syntax, see the Syntax reference page.


When a workflow is created, it is deployed, which makes the workflow ready for execution.

An execution is a single run of the logic contained in a workflow's definition.

Why Workflows?

Workflows are key to making developer experiences easier from a cost and efficiency standpoint. Here are examples how:

  • Pay per use - workflows scale to zero when not in use, incurring no costs when it’s idle. Pricing is based on the number of steps in the workflow, so you only pay if your workflow runs.

  • Serverless - no underlying infrastructure for you to manage. Workflows scales up automatically with no “cold start” effect.

  • Well Integrated - manage events across Google Cloud products or any HTTP-based APIs, including SaaS or private APIs

  • Longer-running operations - serverless products such as Cloud Functions can also be used to set up orchestration but it has a max duration of 9 minutes which can be tricky for longer running workloads. Workflows’ executions, on the other hand, are able to run for upto a year.

  • Built in error handling - workflows's exception handling, including automated HTTP call retries with exponential back-offs, custom error handlers, and other advanced features help in developing and deploying workflows resilient and customize flows in the event of a failure.

Orchestrating data pipelines using Workflows

Below is the flow of our pipeline and corresponding steps:

worfklows

Pipeline Steps

In this pipeline, an input file lands in a GCS bucket. A Dataflow job reads the data and stores it in BigQuery, followed by a cloud function that is used to archive the file. Detailed steps are outlined below:

  1. A scheduled Cloud Scheduler triggers the Workflows job

  2. Workflows trigger a batch Dataflow job calling the create_dataflow_job task.

  3. Dataflow job reads the input file from the ingestion GCS bucket

  4. After completion of data transformation, the Dataflow job writes data to the BigQuery table

  5. Upon completion of the Dataflow job, Workflows triggers a Cloud Function call_file_archival_function for archiving input file

  6. Cloud Function moves the input file from a ‘processing’ to a ‘processed’ Cloud Storage bucket

Cloud Scheduler to Trigger Workflow

In this data pipeline, a Cloud Scheduler job is utilized to trigger the Workflows execution. Learn how to schedule a Workflow using Cloud Scheduler here.

The following gcloud command creates a Cloud Scheduler job that triggers your workflow, using a service account for authentication. The example below schedules the workflow to be executed every 5 minutes:

  gcloud scheduler jobs create http JOB_NAME \
--schedule="*/5 * * * *" \
--uri="https://workflowexecutions.googleapis.com/v1/projects/PROJECT_NAME/locations/REGION_NAME/workflows/WORKFLOW_NAME/executions" \
--time-zone="TIME_ZONE" \
--oauth-service-account-email="SERVICE_ACCOUNT_NAME@PROJECT_NAME.iam.gserviceaccount.com"

Workflow YAML

  1. Kicking off the Dataflow job

  - create_dataflow_job:
           call: googleapis.dataflow.v1b3.projects.locations.templates.create
           args:
             projectId: ${args.project_id}
             location: ${args.location}
             body:
               jobName: ${args.jobName}
               parameters:
                 inputPattern: ${args.inputPattern}
                 columnMapping: ${args.columnMapping}
                 destinationBQTable: ${args.destinationBQTable}
                 ingestionTime: ${CurrentDateTime.body}
                 fileHeaderAppender: ${args.fileHeaderAppender}
               environment:
                 tempLocation: ${args.temp_location}
                 bypassTempDirValidation: false
               gcsPath: ${args.template_path}

2. Triggering cloud function for archiving input file after successful processing

  - get_function:
           call: googleapis.cloudfunctions.v1.projects.locations.functions.get
           args:
               name: ${"projects/" + projectId + "/locations/" + location + "/functions/" + funcName}
           result: function
 
       - call_file_archival_function:
           try:
               call: http.post
               args:
                   url: ${function.httpsTrigger.url}
                   body: {inputs-for-your-cloud-function}
               result: func_response
           except:
               as: e
               steps:
                   - known_errors:
                       switch:
                       - condition: ${e.code == 400}
                         return: "Malformed syntax, please check your inputs"
                   - unhandled_exception:
                       raise: ${e}              
       - the_end:
           return: ${func_response.body}

Workflow Job Graph

wgraph

In this post, we discussed how Workflow can be used in orchestrating data pipelines on GCP. Workflows' rapid scaling, out-of-the-box exception handling, built-in authentication, and integration with Cloud Logging make it a great fit for serverless implementations.

Interested in exploring more Workflows use cases?

To get started with Workflows refer to our quick start guides and tutorials. Also, check out the newly released Workflow connectors with built-in authentication and error handling that can be used to seamlessly integrate with Google Cloud APIs.