Orchestrate Data Pipelines using Workflows
Preetika Bhateja
Strategic Cloud Engineer
Kunal Kumar Gupta
Strategic Cloud Engineer
When working on data engineering problems, the ability to manage execution control, access operational information such as historical runs & logs, and restart jobs from the point of failure are critical aspects of the overall pipeline development. One of the most common ways to achieve this in Google Cloud is using Cloud Composer (based on Apache Airflow).
If you’re looking for a serverless alternative, you can use Workflows to create serverless work flows that link a series of tasks together in the order you define. In this blog we’ll use Workflows to orchestrate a Dataflow pipeline in GCP. Let’s get started!
What are Workflows?
A workflow is made up of a series of steps described using the Workflows syntax, which can be written in either the YAML or JSON format. This is the workflow's definition. For a detailed explanation of the Workflows syntax, see the Syntax reference page.
When a workflow is created, it is deployed, which makes the workflow ready for execution.
An execution is a single run of the logic contained in a workflow's definition.
Why Workflows?
Workflows are key to making developer experiences easier from a cost and efficiency standpoint. Here are examples how:
Pay per use - workflows scale to zero when not in use, incurring no costs when it’s idle. Pricing is based on the number of steps in the workflow, so you only pay if your workflow runs.
Serverless - no underlying infrastructure for you to manage. Workflows scales up automatically with no “cold start” effect.
Well Integrated - manage events across Google Cloud products or any HTTP-based APIs, including SaaS or private APIs
Longer-running operations - serverless products such as Cloud Functions can also be used to set up orchestration but it has a max duration of 9 minutes which can be tricky for longer running workloads. Workflows’ executions, on the other hand, are able to run for upto a year.
Built in error handling - workflows's exception handling, including automated HTTP call retries with exponential back-offs, custom error handlers, and other advanced features help in developing and deploying workflows resilient and customize flows in the event of a failure.
Orchestrating data pipelines using Workflows
Below is the flow of our pipeline and corresponding steps:
Pipeline Steps
In this pipeline, an input file lands in a GCS bucket. A Dataflow job reads the data and stores it in BigQuery, followed by a cloud function that is used to archive the file. Detailed steps are outlined below:
A scheduled Cloud Scheduler triggers the Workflows job
Workflows trigger a batch Dataflow job calling the create_dataflow_job task.
Dataflow job reads the input file from the ingestion GCS bucket
After completion of data transformation, the Dataflow job writes data to the BigQuery table
Upon completion of the Dataflow job, Workflows triggers a Cloud Function call_file_archival_function for archiving input file
Cloud Function moves the input file from a ‘processing’ to a ‘processed’ Cloud Storage bucket
Cloud Scheduler to Trigger Workflow
In this data pipeline, a Cloud Scheduler job is utilized to trigger the Workflows execution. Learn how to schedule a Workflow using Cloud Scheduler here.
The following gcloud command creates a Cloud Scheduler job that triggers your workflow, using a service account for authentication. The example below schedules the workflow to be executed every 5 minutes:
Workflow YAML
Kicking off the Dataflow job
2. Triggering cloud function for archiving input file after successful processing
Workflow Job Graph
In this post, we discussed how Workflow can be used in orchestrating data pipelines on GCP. Workflows' rapid scaling, out-of-the-box exception handling, built-in authentication, and integration with Cloud Logging make it a great fit for serverless implementations.
Interested in exploring more Workflows use cases?
To get started with Workflows refer to our quick start guides and tutorials. Also, check out the newly released Workflow connectors with built-in authentication and error handling that can be used to seamlessly integrate with Google Cloud APIs.