Targeting campaign pipeline

Stay organized with collections Save and categorize content based on your preferences.

This tutorial shows you how to use Cloud Data Fusion to clean, transform, and process customer data to select candidates for a targeting campaign.


You want to create custom marketing materials for an ongoing campaign promotion, and you'd like to distribute the materials directly to your customers' home mailboxes.

Your campaign has two constraints:

  • Location: You only deliver to customers in California, Washington, and Oregon.
  • Cost: To save on fuel, you deliver to easily accessible customer homes. You deliver only to customers who live on avenues.

This tutorial shows you how to generate the list of customer addresses for the campaign. In this tutorial, you do the following:

  1. Clean the customer data: filter customers that live on an avenue in California, Washington, or Oregon.
  2. Create a pipeline that does the following:
    • Joins the filtered customer data with a public dataset that contains state abbreviations.
    • Stores the cleaned and joined data in a BigQuery table that you can query (by using the BigQuery web UI) or analyze (by using Data Studio).


  • Connect Cloud Data Fusion to two data sources
  • Apply basic transformations
  • Join the two data sources
  • Write the output data to a sink


This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Each run of this tutorial runs a 3-node (1 master, 2 workers) Dataproc cluster that runs for about 6 minutes to write about 6 MB of data into BigQuery. Since you will store only a small amount of data in BigQuery, we will ignore the BigQuery cost as part of this estimate. Based on these numbers, your cost of running this pipeline a single time can be approximated as:

total cost = Cloud Data Fusion cost + Dataproc cost

Each of the components in this formula can be broken down as follows:

Cloud Data Fusion cost = (time * Cloud Data Fusion rate)

Dataproc cost = (time * number of VMs * (Compute Engine rate + Dataproc rate))

For example, consider this cost estimate for a 6-minute job using the default Compute Engine profile in Cloud Data Fusion, with 3 n1-standard4 VMs in the us-west1 region:

  • time = 0.1 hours
  • Data Fusion rate = $1.8/hour
  • number of VMs = 3
  • Compute Engine rate = $0.19/hour/VM
  • Dataproc rate = $0.01/hour/VM

Using these values in the cost formula gives the following total cost for this example:

(0.1 * 1.8) + (0.1 * 3 * (0.1900 + 0.01)) = $0.24 (24 cents)

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  6. Enable the Cloud Data Fusion, Cloud Storage, BigQuery and Cloud Dataproc APIs.

    Enable the APIs

  7. Create a Cloud Data Fusion instance.

Prepare the customer data

This tutorial requires the following two input datasets, both of which are provided with your Cloud Data Fusion instance:

  • Sample customer data: A CSV file named customers.csv.
  • State abbreviations: A BigQuery table named state_abbreviations.

Load the customer data

  1. Go to the Cloud Data Fusion web UI.
  2. Navigate to the Wrangler page of the web UI.
  3. In the left panel, under Google Cloud Storage, click Sample Buckets.
  4. Click campaign-tutorial.
  5. Click customers.csv. The customer data appears.

Clean the customer data

Parse the customer data into table format, set the schema, and filter the customer data to present only the target audience you need.

Parse the data

  1. Click the body column drop-down.
  2. Select Parse > CSV.
  3. With the delimiter selected as comma, click Apply. image
  4. The data splits into multiple columns, so the original body column is no longer needed. Click the body column drop-down and select Delete column.

Set the schema

Set the schema of the data by assigning appropriate names to the table columns. Instead of bodyXX, rename the columns to capture the information they present.

  1. In the Columns tab on the right, click the Column names drop-down and select Set all.
  2. In the Bulk set column names dialog box, enter the following comma-separated text: Name,StreetAddress,City,State,Country.
  3. Click Apply.

Filter the data

Filter the data to display only customers that live in California, Oregon, or Washington. To do this, remove all rows that contain values other than these three states.

  1. Click the State column drop-down.
  2. Select Filter.
  3. In the filter window, do the following:

    1. Click Keep rows.
    2. In the If drop-down, select value matches regex.
    3. Enter the following regular expression: ^(California|Oregon|Washington)$.
    4. Click Apply.

    The values in the State column are "California", "Oregon", or "Washington".

Filter the data to display only customers that live on avenues. To do this, keep only the addresses that contain the string 'avenue'.

  1. To the left of the StreetAddress column, click the down arrow and select Filter.
  2. In the If drop-down, select value contains and enter Avenue.
  3. Select Ignore case.

Before performing parallel-processing jobs on your entire dataset, Wrangler displays only the first 1000 values of your dataset. Because you filtered some data, only a few customers remain in the Wrangler display.

Create the pipeline

You've cleaned your data and you've run transformations on a subset of your data. You can now create a batch pipeline to run transformations on your entire dataset.

  1. In Wrangler, on the upper-right side, click Create pipeline.
  2. Select Batch pipeline. You are taken to the Pipeline Studio page.
  3. In the upper left, make sure Data Pipeline - Batch is displayed as the pipeline type.

In the Pipeline Studio canvas, a GCSFile source node is connected to a Wrangler node.

All the transformations you applied in the Wrangler view appear in the Wrangler node of the Pipeline Studio canvas. To see the transformations you applied, hold the pointer over the Wrangler node and click Properties. Under Directives, the transformations you applied appear.

You can apply more transformations by clicking Wrangle. This takes you back to the Wrangler page. When you return to the node properties in the Pipeline Studio view, you'll see the new transformation you added.

For example, you realize the Country column isn't needed because the value is always 'USA'. You delete the column by following these steps:

  1. Click Wrangle.
  2. Click the down arrow next to Country and select Delete Column.
  3. Click Apply. This closes the Wrangler view and returns to the Wrangler Properties window in the Pipeline Studio. Under the Directives section, drop Country appears.
  4. Click the X button.

Abbreviate the state names

Your delivery vehicle's navigation system only recognizes addresses that contain abbreviated state names (CA, not California), and currently your customer data contains full state names.

The public BigQuery state_abbreviations table contains two columns: one with the full state names and one with the abbreviated state names. You'll use this table to update the state names in your customer data.

View the state names data in BigQuery

  1. In a separate tab, open the BigQuery UI in the Google Cloud console.

    Go to the BigQuery UI

  2. Enter the following query in the Query Editor and click Run.

    SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`

Access the BigQuery table

Add a source in your pipeline that will access this BigQuery state_abbreviations table.

  1. Back in the Cloud Data Fusion tab, in the Pipeline Studio, in the plugin section on the left, click Source.
  2. Click BigQuery.

    A BigQuery source node appears on the canvas, along with the other two nodes.

  3. Hold the pointer over the BigQuery source node and click Properties.
    1. Under Reference Name, enter state_abbreviations.
    2. Under Dataset Project ID Name, enter dis-user-guide.
    3. Under Dataset, enter campaign_tutorial.
    4. Under Table, enter state_abbreviations.
  4. Populate the schema of the table from BigQuery by clicking Get Schema.
  5. Click the X button.

Join the two data sources

Join the two data sources, the customer data and the state abbreviations, to generate output that contains customer data with abbreviated state names.

  1. In the Pipeline Studio, in the plugin section on the left, click Analytics.
  2. Click Joiner.

    A Joiner node, representing an action similar to an SQL Join, appears on the canvas.

  3. Connect the Wrangler node and the BigQuery node to the Joiner node: Drag a connection arrow on the right edge of the source node and drop onto the destination node.
  4. Hold the pointer over the Joiner node and click Properties.
    1. Under Join, expand Wrangler and BigQuery.
      1. Deselect the State field under Wrangler and the name field under BigQuery, since you want only the abbreviated state name and not the full state name.
      2. Under BigQuery, change the alias for the abbreviation field, to State so that it is easy to identify.
    2. Under Join Type, leave the value as Outer. Under Required Inputs select Wrangler.
    3. Under Join Condition, set the join condition to join the State column in the Wrangler node with the name column in the BigQuery node.
    4. Generate the schema of the resultant join. Click Get Schema.
    5. Click the X button.

Store the output to BigQuery

Store the result of your pipeline into a BigQuery table. Where you store your data is called a sink.

  1. In the Pipeline Studio, in the plugin section on the left, click Sink.
  2. Click BigQuery Table.
  3. Connect the Joiner node to the BigQuery Table node.
  4. Hold the pointer over the BigQuery Table node and click Properties.
    1. Under Reference Name, enter customer_data_abbreviated_states.
    2. Under Dataset, enter dis_user_guide.
    3. Under Table, select customer_data_abbreviated_states.
    4. Click the X button.

Deploy and run the pipeline

  1. In the Pipeline Studio, click Name your pipeline and enter CampaignPipeline.
  2. In the upper-right corner, click Deploy.
  3. Once deployment completes, click Run. Running your pipeline can take a few minutes. While you wait, you can observe the Status of the pipeline transition from Provisioning to Starting to Running to Deprovisioning to Succeeded.

View the results

  1. Query the campaign_targets table in the BigQuery UI.

    Go to the BigQuery UI

  2. Update the project name in the query below to your own project name.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the Cloud Data Fusion instance

Follow these instructions to delete your Cloud Data Fusion instance.

Delete the BigQuery dataset

To delete the BigQuery dataset you created as part of this tutorial, do the following:

  1. In the Google Cloud console, go to the BigQuery page.

    Go to the BigQuery page

  2. Select the dis_user_guide dataset.

What's next