Targeting campaign pipeline

This tutorial shows you how to use the Wrangler and Data Pipelines features in Cloud Data Fusion to clean, transform, and process customer data to select candidates for a targeting campaign.


You want to create custom marketing materials for an ongoing campaign promotion, and you want drivers to distribute the materials directly to customers' home mailboxes.

Your campaign has two constraints:

  • Location: You only deliver to customers in California, Washington, and Oregon.
  • Cost: To save on fuel, you only deliver to customers who live on Avenues (not Roads or Courts) that cars can access customers' homes easily.

To generate the list of customer addresses for the campaign, you use:

  1. Wrangler to clean the data and apply filters for customers in California, Washington, and Oregon who live on an Avenue.
  2. Data Pipeline to create a pipeline that:
    • Joins the customer data with a public dataset in BigQuery that contains state abbreviations.
    • Stores the cleaned and joined data in a BigQuery table that you can query by using the BigQuery web UI or analyze by using Data Studio.


  • Connect Cloud Data Fusion to a couple of data sources
  • Apply basic transformations
  • Join the two data sources
  • Write data to a sink


This tutorial uses billable components of GCP, including:

  • Cloud Data Fusion
  • Cloud Dataproc
  • BigQuery

Each run of this quickstart runs a 3-node (1 master, 2 workers) Cloud Dataproc cluster that runs for about 6 minutes to write about 6MB of data into BigQuery. Since you will store only a small amount of data in BigQuery, we will ignore the BigQuery cost as part of this estimate. Based on these numbers, your cost of running this pipeline a single time can be approximated as:

Cloud Data Fusion cost + Cloud Dataproc cost
= (num_hours * hourly cost of Cloud Data Fusion) + (num_hours * hourly cost of Cloud Dataproc)
= (0.1 * 1.8) + (0.1 * 0.01)
= 0.18 + 0.001
~ 18 cents

Use the pricing calculator to generate a cost estimate based on your projected usage. New GCP users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the project selector page

  3. Make sure that billing is enabled for your Google Cloud Platform project. Learn how to enable billing.

  4. Enable the Cloud Data Fusion, Cloud Storage, BigQuery and Cloud Dataproc APIs.

    Enable the APIs

  5. Create a Cloud Data Fusion instance.

Preparing the customer data

This tutorial requires the following two input datasets, which are pre-configured in your Cloud Data Fusion instance in the Wrangler UI:

  1. Sample customer data: Available in a bucket named campaign-tutorial as a CSV file customers.csv in the Cloud Storage connection Sample Datasets.
  2. State abbreviations: Available as a BigQuery table state_abbreviations in the campaign_tutorial dataset in the BigQuery connection Sample Datasets.

Loading the customer data

The data for this tutorial is in a Cloud Storage bucket. The Cloud Storage bucket is publicly available through the Sample Buckets connection and is added to your Cloud Data Fusion installation by default.

  1. Go to the Cloud Data Fusion UI.
  2. On the Cloud Data Fusion top menu bar, choose Wrangler.

    On the left side is a panel with the pre-configured connections to your data, including the Cloud Storage connection.

  3. Under Cloud Storage, select the Sample Buckets connection.

    On the right side is a list of buckets in the Cloud Storage connection, including the campaign-tutorial.

  4. Select the campaign-tutorial > customers.csv.

    The customer data loads into the Wrangler screen in row/column form.

Cleaning the customer data

You'll now perform some transformations to parse and clean the customer data.

Transforming the data

  1. To the left of the body column, click the Down arrow.
  2. Click Parse > CSV and click Apply. The data splits into multiple columns. image
  3. Because the body column isn't needed anymore, click the Down arrow next to the body column and choose Delete column.

    You can now set the schema of the data by assigning appropriate names to the remaining columns.

  4. In the columns tab on the right, click the Column names Down arrow and select Set all.

    Tip: You can also set the column names individually by clicking each column's name in the table and entering a new name.

  5. To set the schema, insert the following comma-separated text in the modal and then click Apply: Name,StreetAddress,City,State,Country.

You've performed some basic transformations. Now you can add some filters to narrow down the target audience of the campaign using the selection criteria described in the beginning of the tutorial.

Filtering the data

Choosing the correct states: Because your campaign targets customers only in California, Oregon, or Washington, you need to remove all rows that contain values other than the ones you want.

  1. To the left of the State column, click the Down arrow and choose Filter.
  2. In the filter pane, click Keep Rows and in the drop-down list, select if value matches regex.
  3. Enter the following regular expression: ^(California|Oregon|Washington)$.
  4. Click Apply to filter.

    Only the states that match the desired condition remain.

Choosing the correct street type: Because it's more fuel and cost efficient to deliver to addresses that are centrally located and easy to access by car, you want to keep only the addresses that contain the word Avenue, such as 61 Summit Avenue.

  1. To the left of the StreetAddress column, click the Down arrow and choose Filter.
  2. Click if Value Contains and enter Avenue.
  3. Select Ignore case.

Wrangler is an interactive, visual tool that lets you see the effects of transformations on a small subset of your data before dispatching large, parallel-processing jobs on the entire dataset. Because it only shows the first 1000 values from your dataset, only a few customers remain.

Creating the pipeline

Basic data cleansing is done, and you've run transformations on a subset of your data. You can now create a batch pipeline to run transformations on all your data.

  1. On the upper-right side, click Create Pipeline.
  2. In the dialog that appears, select Batch pipeline.

    Once the pipeline studio appears, in the upper left, you should see that Data Pipeline - Batch is selected as the pipeline type.

    In the Data Pipelines UI, you will see a GCSFile source node (with the annotation customer) connected into a Wrangler node.

    The Wrangler node contains all the transformations you applied in the Wrangler view captured as a directive grammar.

  3. At this stage, you can apply more transformations by clicking the Wrangle button.

    For example, because the delivery vehicle's navigation system will be the data consumer and does not require the Country, you can remove the column.

  4. To delete the Country column, click Wrangle.
  5. Click the Down arrow next to Country and choose Delete Column.

    The column that contains the USA value disappears.

  6. Click Apply.
  7. To close the Wrangler area, click the X button.

Abbreviating the state names

The delivery vehicle's navigation system only recognizes addresses that contain abbreviated state names (CA not California). Currently, your customer data contains full state names.

The public state_abbreviations BigQuery table contains two columns: one with the full state names and one with the abbreviated state names. You'll use this table to update the state names in your customer data.

Viewing the state names data in BigQuery

To view the data in the BigQuery UI:

  1. In a separate tab, open the BigQuery UI in the GCP Console.

    Go to the BigQuery UI

  2. To view the contents of the table state_abbreviations, enter the following query in the Query Editor and then click Run Query: SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`

    You can see that this table contains the names of all states in the United States and their abbreviations.

Accessing the BigQuery table

You'll add a source in your pipeline to access this BigQuery table.

  1. In the Cloud Data Fusion tab, from the Plugin palette on the left, choose the BigQuery source from the Source section.

    A BigQuery source node appears on the canvas with the two other nodes.

  2. Point to the BigQuery source node and click Properties.
  3. To configure the Reference Name, enter state_abbreviations, which is used to identify this data source for lineage purposes.

    The BigQuery Dataset and Table configurations are the Dataset and Table names as found in BigQuery.

  4. To populate the schema of this table from BigQuery, click Get Schema.
  5. To save this schema and close the BigQuery Source Node, click Apply.

Joining the two data sources

Now you can join the two data sources—customer data and state abbreviations—to generate output that contains customer data with abbreviated state names.

  1. Under the Analytics section in the Plugin Palette, choose Joiner.

    A Joiner node appears on the canvas.

  2. To connect the Wrangler node and the BigQuery node to the Joiner node: Drag a connection arrow > on the right edge of the source node and drop on the destination node.
  3. To configure the Joiner node, which is similar to an SQL JOIN syntax:

    1. Select the columns to include from both inputs.
    2. Add an alias to any duplicate column names.
    3. Since you want only the abbreviated state name and not the full name, exclude the State field from the Wrangler node and the Name field from the State Abbreviations node.

      The abbreviated state name is present in the abbreviation field in the State Abbreviations node.

    4. Add the alias State to it so that it is easily identifiable.

    5. Leave the Join type as Outer and set the Join condition to join the State column in the Wrangler node to the name column in the State Abbreviations node.

    6. To generate the schema of the resultant join, click Generate Schema and then click Apply.

You're ready to proceed to the final step of adding a sink to this pipeline.

Storing the output to BigQuery

You'll store the result of your pipeline into a BigQuery table. Where you store your data is called a sink.

  1. In the Sink section of the Plugin Palette, choose BigQuery Table.
  2. Connect the Joiner node to the BigQuery node.
  3. Open the BigQuery node and configure the node, as shown below.

    You'll use configuration that's similar to the existing BigQuery source except you'll use a different, non-existent table.

  4. Name the pipeline CampaignPipeline.

That's it. You've created your first pipeline and can deploy and run the pipeline.

Deploying and running the pipeline

To deploy CampaignPipeline:

  1. In the upper-right corner, click Deploy.
  2. Once deployed, click Run and wait for the pipeline to run to completion.

Viewing the results

To view the results after the pipeline runs:

  1. Query the campaign_targets table in the BigQuery UI.

    Go to the BigQuery UI

  2. Update the project name in the query below to your own project name.

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

After you've finished the tutorial doc type template tutorial, you can clean up the resources that you created on GCP so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Manage resources page.

    Go to the Manage resources page

  2. In the project list, select the project you want to delete and click Delete .
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Deleting the Cloud Data Fusion instance

Follow these instructions to delete your Cloud Data Fusion instance.

Deleting the BigQuery dataset

To delete the BigQuery dataset you created as part of this tutorial:

  1. In the GCP Console, go to the BigQuery page.

    Go to the BigQuery page

  2. Select the dis_user_guide dataset.
  3. To delete the dataset, click delete DELETE DATASET toward mid-right side of the page.

What's next

Gửi phản hồi về...