This tutorial shows you how to use the Wrangler and Data Pipelines features in Cloud Data Fusion to clean, transform, and process customer data to select candidates for a targeting campaign.
You want to create custom marketing materials for an ongoing campaign promotion, and you want drivers to distribute the materials directly to customers' home mailboxes.
Your campaign has two constraints:
- Location: You only deliver to customers in California, Washington, and Oregon.
- Cost: To save on fuel, you only deliver to customers who live on Avenues (not Roads or Courts) that cars can access customers' homes easily.
To generate the list of customer addresses for the campaign, you use:
- Wrangler to clean the data and apply filters for customers in California, Washington, and Oregon who live on an Avenue.
- Data Pipeline to create a pipeline that:
- Joins the customer data with a public dataset in BigQuery that contains state abbreviations.
- Stores the cleaned and joined data in a BigQuery table that you can query by using the BigQuery web UI or analyze by using Data Studio.
- Connect Cloud Data Fusion to a couple of data sources
- Apply basic transformations
- Join the two data sources
- Write data to a sink
This tutorial uses billable components of GCP, including:
- Cloud Data Fusion
- Cloud Dataproc
Each run of this quickstart runs a 3-node (1 master, 2 workers) Cloud Dataproc cluster that runs for about 6 minutes to write about 6MB of data into BigQuery. Since you will store only a small amount of data in BigQuery, we will ignore the BigQuery cost as part of this estimate. Based on these numbers, your cost of running this pipeline a single time can be approximated as:
Cloud Data Fusion cost + Cloud Dataproc cost = (num_hours * hourly cost of Cloud Data Fusion) + (num_hours * hourly cost of Cloud Dataproc) = (0.1 * 1.8) + (0.1 * 0.01) = 0.18 + 0.001 ~ 18 cents
Before you begin
Sign in to your Google Account.
If you don't already have one, sign up for a new account.
Select or create a GCP project.
Make sure that billing is enabled for your Google Cloud Platform project. Learn how to enable billing.
- Enable the Cloud Data Fusion, Cloud Storage, BigQuery and Cloud Dataproc APIs.
- Create a Cloud Data Fusion instance.
Preparing the customer data
This tutorial requires the following two input datasets, which are pre-configured in your Cloud Data Fusion instance in the Wrangler UI:
- Sample customer data: Available in a bucket named
campaign-tutorialas a CSV file
customers.csvin the Cloud Storage connection Sample Datasets.
- State abbreviations: Available as a BigQuery table
campaign_tutorialdataset in the BigQuery connection Sample Datasets.
Loading the customer data
The data for this tutorial is in a Cloud Storage bucket. The Cloud Storage bucket is publicly available through the Sample Buckets connection and is added to your Cloud Data Fusion installation by default.
- Go to the Cloud Data Fusion UI.
- On the Cloud Data Fusion top menu bar, choose Wrangler.
On the left side is a panel with the pre-configured connections to your data, including the Cloud Storage connection.
- Under Cloud Storage, select the Sample Buckets
On the right side is a list of buckets in the Cloud Storage connection, including the campaign-tutorial.
- Select the campaign-tutorial >
The customer data loads into the Wrangler screen in row/column form.
Cleaning the customer data
You'll now perform some transformations to parse and clean the customer data.
Transforming the data
- To the left of the body column, click the Down arrow.
- Click Parse > CSV and click Apply. The data splits into multiple columns.
- Because the body column isn't needed anymore, click the Down arrow next
bodycolumn and choose Delete column.
You can now set the schema of the data by assigning appropriate names to the remaining columns.
- In the columns tab on the right, click the Column names Down
arrow and select Set all.
Tip: You can also set the column names individually by clicking each column's name in the table and entering a new name.
- To set the schema, insert the
following comma-separated text in the modal and then click Apply:
You've performed some basic transformations. Now you can add some filters to narrow down the target audience of the campaign using the selection criteria described in the beginning of the tutorial.
Filtering the data
Choosing the correct states: Because your campaign targets customers only in California, Oregon, or Washington, you need to remove all rows that contain values other than the ones you want.
- To the left of the State column, click the Down arrow and choose Filter.
- In the filter pane, click Keep Rows and in the drop-down list, select if value matches regex.
- Enter the following regular expression:
- Click Apply to filter.
Only the states that match the desired condition remain.
Choosing the correct street type: Because it's more fuel and cost efficient to deliver to addresses that are centrally located and easy to access by car, you want to keep only the addresses that contain the word Avenue, such as 61 Summit Avenue.
- To the left of the
StreetAddresscolumn, click the Down arrow and choose Filter.
- Click if Value Contains and enter Avenue.
- Select Ignore case.
Wrangler is an interactive, visual tool that lets you see the effects of transformations on a small subset of your data before dispatching large, parallel-processing jobs on the entire dataset. Because it only shows the first 1000 values from your dataset, only a few customers remain.
Creating the pipeline
Basic data cleansing is done, and you've run transformations on a subset of your data. You can now create a batch pipeline to run transformations on all your data.
- On the upper-right side, click Create Pipeline.
- In the dialog that appears, select Batch pipeline.
Once the pipeline studio appears, in the upper left, you should see that Data Pipeline - Batch is selected as the pipeline type.
In the Data Pipelines UI, you will see a GCSFile source node (with the annotation customer) connected into a Wrangler node.
The Wrangler node contains all the transformations you applied in the Wrangler view captured as a directive grammar.
- At this stage, you can apply more transformations by clicking the
For example, because the delivery vehicle's navigation system will be the data consumer and does not require the Country, you can remove the column.
- To delete the Country column, click Wrangle.
- Click the Down arrow next to Country and choose Delete Column.
The column that contains the USA value disappears.
- Click Apply.
- To close the Wrangler area, click the X button.
Abbreviating the state names
The delivery vehicle's navigation system only recognizes addresses that contain abbreviated state names (CA not California). Currently, your customer data contains full state names.
state_abbreviations BigQuery table contains two
columns: one with the full state names and one with the abbreviated state names.
You'll use this table to update the state names in your customer data.
Viewing the state names data in BigQuery
To view the data in the BigQuery UI:
- In a separate tab, open the BigQuery UI in the GCP Console.
- To view the contents of the table
state_abbreviations, enter the following query in the Query Editor and then click Run Query:
SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`
You can see that this table contains the names of all states in the United States and their abbreviations.
Accessing the BigQuery table
You'll add a source in your pipeline to access this BigQuery table.
- In the Cloud Data Fusion tab, from the Plugin palette on the
left, choose the BigQuery source from the Source section.
A BigQuery source node appears on the canvas with the two other nodes.
- Point to the BigQuery source node and click Properties.
- To configure the Reference Name, enter state_abbreviations, which is
used to identify this data source for lineage purposes.
The BigQuery Dataset and Table configurations are the Dataset and Table names as found in BigQuery.
- To populate the schema of this table from BigQuery, click Get Schema.
- To save this schema and close the BigQuery Source Node, click Apply.
Joining the two data sources
Now you can join the two data sources—customer data and state abbreviations—to generate output that contains customer data with abbreviated state names.
- Under the Analytics section in the Plugin Palette, choose Joiner.
A Joiner node appears on the canvas.
- To connect the Wrangler node and the BigQuery node to the
Joiner node: Drag a connection arrow
>on the right edge of the source node and drop on the destination node.
To configure the Joiner node, which is similar to an SQL JOIN syntax:
- Select the columns to include from both inputs.
- Add an alias to any duplicate column names.
Since you want only the abbreviated state name and not the full name, exclude the State field from the Wrangler node and the Name field from the State Abbreviations node.
The abbreviated state name is present in the abbreviation field in the State Abbreviations node.
Add the alias
Stateto it so that it is easily identifiable.
Leave the Join type as Outer and set the Join condition to join the
Statecolumn in the Wrangler node to the
namecolumn in the State Abbreviations node.
To generate the schema of the resultant join, click Generate Schema and then click Apply.
You're ready to proceed to the final step of adding a sink to this pipeline.
Storing the output to BigQuery
You'll store the result of your pipeline into a BigQuery table. Where you store your data is called a sink.
- In the Sink section of the Plugin Palette, choose BigQuery Table.
- Connect the Joiner node to the BigQuery node.
Open the BigQuery node and configure the node, as shown below.
You'll use configuration that's similar to the existing BigQuery source except you'll use a different, non-existent table.
Name the pipeline CampaignPipeline.
That's it. You've created your first pipeline and can deploy and run the pipeline.
Deploying and running the pipeline
To deploy CampaignPipeline:
- In the upper-right corner, click Deploy.
Once deployed, click Run and wait for the pipeline to run to completion.
Viewing the results
To view the results after the pipeline runs:
- Query the
campaign_targetstable in the BigQuery UI.
- Update the project name in the query below to your own project name.
To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:
After you've finished the tutorial doc type template tutorial, you can clean up the resources that you created on GCP so they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.
Deleting the project
The easiest way to eliminate billing is to delete the project that you created for the tutorial.
To delete the project:
- In the GCP Console, go to the Manage resources page.
- In the project list, select the project you want to delete and click Delete delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Deleting the Cloud Data Fusion instance
Follow these instructions to delete your Cloud Data Fusion instance.
Deleting the BigQuery dataset
To delete the BigQuery dataset you created as part of this tutorial:
- In the GCP Console, go to the BigQuery page.
- Select the
- To delete the dataset, click delete DELETE DATASET toward mid-right side of the page.