Creating a reusable pipeline


This tutorial shows how to build a reusable pipeline that reads data from Cloud Storage, performs data quality checks, and writes to Cloud Storage.

Reusable pipelines have a regular pipeline structure, but you can change the configuration of each pipeline node based on configurations provided by an HTTP server. For example, a static pipeline might read data from Cloud Storage, apply transformations, and write to a BigQuery output table. If you want the transformation and BigQuery output table to change based on the Cloud Storage file that the pipeline reads, you create a reusable pipeline.

Objectives

  • Use the Cloud Storage Argument Setter plugin to allow the pipeline to read different input in every run.
  • Use the Cloud Storage Argument Setter plugin to allow the pipeline to perform different quality checks in every run.
  • Write the output data of each run to Cloud Storage.

Costs

In this document, you use the following billable components of Google Cloud:

  • Cloud Data Fusion
  • Cloud Storage

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Data Fusion, Cloud Storage, BigQuery, and Dataproc APIs.

    Enable the APIs

  7. Create a Cloud Data Fusion instance.

When using Cloud Data Fusion, you use both the Google Cloud console and the separate Cloud Data Fusion web interface. In the Google Cloud console, you can create a Google Cloud console project, and create and delete Cloud Data Fusion instances. In the Cloud Data Fusion web interface, you can use the various pages, such as Pipeline Studio or Wrangler, to use Cloud Data Fusion features.

  1. In the Google Cloud console, open the Instances page.

    Open the Instances page

  2. In the Actions column for the instance, click the View Instance link. The Cloud Data Fusion web interface opens in a new browser tab.

Deploy the Cloud Storage Argument Setter plugin

  1. In the Cloud Data Fusion web interface, go to the Studio page.

  2. In the Actions menu, click GCS Argument Setter.

Read from Cloud Storage

  1. In the Cloud Data Fusion web interface, go to the Studio page.
  2. Click Source and select Cloud Storage. The node for a Cloud Storage source appears in the pipeline.
  3. On the Cloud Storage node, click Properties.

  4. In the Reference name field, enter a name.

  5. In the Path field, enter ${input.path}. This macro controls what the Cloud Storage input path will be in the different pipeline runs.

  6. In the right Output Schema panel, remove the offset field from the output schema by clicking the trash icon in the offset field row.

  7. Click Validate and address any errors.

  8. Click the to exit the Properties dialog.

Transform your data

  1. In the Cloud Data Fusion web interface, go to your data pipeline on the Studio page.
  2. In the Transform drop-down menu , select Wrangler.
  3. In the Pipeline Studio canvas, drag an arrow from the Cloud Storage node to the Wrangler node.
  4. Go to the Wrangler node in your pipeline and click the Properties.
  5. In the Input field name, enter body.
  6. In the Recipe field, enter ${directives}. This macro controls what the transform logic will be in the different pipeline runs.
  7. Click Validate and address any errors.
  8. Click the to exit the Properties dialog.

Write to Cloud Storage

  1. In the Cloud Data Fusion web interface, go to your data pipeline on the Studio page.
  2. In the Sink drop-down menu , select Cloud Storage.
  3. In the Pipeline Studio canvas, drag an arrow from the Wrangler node to the Cloud Storage node that you just added.
  4. Go to the Cloud Storage sink node in your pipeline and click Properties.
  5. In the Reference name field, enter a name.
  6. In the Path field, enter the path of a Cloud Storage bucket in your project, where your pipeline can write the output files. If you don't have a Cloud Storage bucket, create one.
  7. Click Validate and address any errors.
  8. Click the to exit the Properties dialog.

Set the macro arguments

  1. In the Cloud Data Fusion web interface, go to your data pipeline on the Studio page.
  2. In the Conditions and Actions drop-down menu, click GCS Argument Setter.
  3. In the Pipeline Studio canvas, drag an arrow from the Cloud Storage Argument Setter node to the Cloud Storage source node.
  4. Go to the Cloud Storage Argument Setter node in your pipeline and click Properties.
  5. In the URL field, enter the following URL:

    gs://reusable-pipeline-tutorial/args.json
    

    The URL corresponds to a publicly accessible object in Cloud Storage that contains the following content:

    {
      "arguments" : [
        {
          "name": "input.path",
          "value": "gs://reusable-pipeline-tutorial/user-emails.txt"
        },
        {
          "name": "directives",
          "value": "send-to-error !dq:isEmail(body)"
        }
      ]
    }
    

    The first of the two arguments is the value for input.path. The path gs://reusable-pipeline-tutorial/user-emails.txt is a publicly accessible object in Cloud Storage that contains the following test data:

    alice@example.com
    bob@example.com
    craig@invalid@example.com
    

    The second argument is the value for directives. The value send-to-error !dq:isEmail(body) configures Wrangler to filter out any lines that are not a valid email address. For example, craig@invalid@example.com is filtered out.

  6. Click Validate to make sure you don't have any errors.

  7. Click the to exit the Properties dialog.

Deploy and run your pipeline

  1. In the top bar of the Pipeline Studio page, click Name your pipeline. Name your pipeline and click Save.

  2. Click Deploy.

  3. To open the Runtime Arguments and view the macro (runtime) input.path and directives arguments, click the drop-down next to Run.

    Leave the value fields blank to notify Cloud Data Fusion that the Cloud Storage Argument Setter node in the pipeline will set the values of these arguments during runtime.

  4. Click Run.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

After you've finished the tutorial, clean up the resources you created on Google Cloud so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Delete the Cloud Data Fusion instance

Follow the instructions to delete your Cloud Data Fusion instance.

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next