This tutorial shows how to build a reusable pipeline that reads data from Cloud Storage, performs data quality checks, and writes to Cloud Storage.
Reusable pipelines have a regular pipeline structure, but you can change the configuration of each pipeline node based on configurations provided by an HTTP server. For example, a static pipeline might read data from Cloud Storage, apply transformations, and write to a BigQuery output table. If you want the transformation and BigQuery output table to change based on the Cloud Storage file that the pipeline reads, you create a reusable pipeline.
Objectives
- Use the Argument Setter plugin to allow the pipeline to read different input in every run.
- Use the Argument Setter plugin to allow the pipeline to perform different quality checks in every run.
- Write the output data of each run to Cloud Storage.
Costs
In this document, you use the following billable components of Google Cloud:
- Cloud Data Fusion
- Cloud Storage
To generate a cost estimate based on your projected usage,
use the pricing calculator.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Data Fusion, Cloud Storage, BigQuery, and Dataproc APIs.
- Create a Cloud Data Fusion instance.
Navigate to the Cloud Data Fusion web interface
When using Cloud Data Fusion, you use both the Google Cloud console and the separate Cloud Data Fusion web interface. In the Google Cloud console, you can create a Google Cloud console project, and create and delete Cloud Data Fusion instances. In the Cloud Data Fusion web interface, you can use the various pages, such as Pipeline Studio or Wrangler, to use Cloud Data Fusion features.
In the Google Cloud console, open the Instances page.
In the Actions column for the instance, click the View Instance link. The Cloud Data Fusion web interface opens in a new browser tab.
Deploy the Argument Setter plugin
In the Cloud Data Fusion web interface, click Hub in the upper right.
Click the Argument setter action plugin and click Deploy.
In the Deploy window that opens, click Finish.
Click Create a pipeline. The Pipeline Studio page opens.
Read from Cloud Storage
- On the Pipeline Studio page, click the arrow_drop_down Source and select Google Cloud Storage.
- Hover over the Cloud Storage source card and click the Properties
button that appears.
- In the Reference name field, enter a name.
- In the Path field, enter
${input.path}
. This macro controls what the Cloud Storage input path will be in the different pipeline runs. - In the right Output Schema panel, remove the offset field from the output
schema by clicking the trash icon in the offset field row.
- Click Validate to make sure you don't have any errors.
- Click the to exit the Properties dialog.
Transform your data
- In the left panel of the Pipeline Studio page, using the Transform drop-down menu arrow_drop_down, select Wrangler.
- In the Pipeline Studio canvas, drag an arrow from the Cloud Storage card
to the Wrangler card.
- Point to the Wrangler card and click the Properties button that appears.
- In the Input field name, enter
body
. - In the Recipe field, enter
${directives}
. This macro controls what the transform logic will be in the different pipeline runs. - Click Validate to make sure you don't have any errors.
- Click the to exit the Properties dialog.
Write to Cloud Storage
- In the left panel of the Pipeline Studio page, using the Sink drop-down menu arrow_drop_down, select Cloud Storage.
- In the Pipeline Studio canvas, drag an arrow from the Wrangler card to the
Cloud Storage card you just added.
- Point to the Cloud Storage sink card and click the Properties button that appears.
- In the Reference name field, enter a name.
- In the Path field, enter the path of a Cloud Storage bucket in
your project, where your pipeline can write the output files. If you don't have
a Cloud Storage bucket, create one.
- Click Validate to make sure you don't have any errors.
- Click the to exit the Properties dialog.
Set the macro arguments
- In the left panel of the Pipeline Studio page, using the Conditions and Actions drop-down menu arrow_drop_down, select the Argument Setter plugin.
- In the Pipeline Studio canvas, drag an arrow from the Argument Setter card to
the Cloud Storage source card.
- Point to the Argument Setter card and click the Properties button that appears.
In the URL field, enter the following.
gs://reusable-pipeline-tutorial/args.json
The URL corresponds to a publicly accessible object in Cloud Storage that contains the following content:
{ "arguments" : [ { "name": "input.path", "value": "gs://reusable-pipeline-tutorial/user-emails.txt" }, { "name": "directives", "value": "send-to-error !dq:isEmail(body)" } ] }
The first of the two arguments is the value for
input.path
. The pathgs://reusable-pipeline-tutorial/user-emails.txt
is a publicly accessible object in Cloud Storage that contains the following test data:alice@example.com bob@example.com craig@invalid@example.com
The second argument is the value for
directives
. The valuesend-to-error !dq:isEmail(body)
configures Wrangler to filter out any lines that are not a valid email address. For example,craig@invalid@example.com
is filtered out.Click Validate to make sure you don't have any errors.
Click the
to exit the Properties dialog.
Deploy and run your pipeline
In the top bar of the Pipeline Studio page, click Name your pipeline. Name your pipeline and click Save.
Click Deploy.
To open the Runtime Arguments and view the macro (runtime)
input.path
anddirectives
arguments, click the arrow_drop_down drop-down next to Run.Leave the value fields blank to notify Cloud Data Fusion that the Argument Setter node in the pipeline will set the values of these arguments during runtime.
Click Run.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
After you've finished the tutorial, clean up the resources you created on Google Cloud so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.
Delete the Cloud Data Fusion instance
Follow the instructions to delete your Cloud Data Fusion instance.
Delete the project
The easiest way to eliminate billing is to delete the project that you created for the tutorial.
To delete the project:
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
What's next
- Read the how-to guides
- Work through another tutorial