Joining streaming data with Dataflow SQL

This tutorial shows you how to use Dataflow SQL to join a stream of data from Pub/Sub with data from a BigQuery table.

Objectives

In this tutorial, you:

  • Write a Dataflow SQL query that joins Pub/Sub streaming data with BigQuery table data.
  • Deploy a Dataflow job from the Dataflow SQL UI.

Costs

This tutorial uses billable components of Google Cloud, including:

  • Dataflow
  • Cloud Storage
  • Pub/Sub

Use the pricing calculator to generate a cost estimate based on your projected usage. New Google Cloud users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Cloud Console, go to the Create service account page.

      Go to Create service account
    2. Select a project.
    3. In the Service account name field, enter a name. The Cloud Console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Click the Select a role field.

      Under Quick access, click Basic, then click Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Cloud Console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  8. Install and initialize the Cloud SDK. Choose one of the installation options. You might need to set the project property to the project that you are using for this walkthrough.
  9. Go to the Dataflow SQL web UI in the Cloud Console. This opens your most recently accessed project. To switch to a different project, click the name of the project at the top of the Dataflow SQL web UI, and search for the project you want to use.
    Go to the Dataflow SQL web UI

Create example sources

If you would like to follow the example provided in this tutorial, create the following sources and use them in the steps of the tutorial.

  • A Pub/Sub topic called transactions - A stream of transaction data that arrives via a subscription to the Pub/Sub topic. The data for each transaction includes information like the product purchased, the sale price, and the city and state in which the purchase occurred. After you create the Pub/Sub topic, you create a script that publishes messages to your topic. You will run this script in a later section of this tutorial.
  • A BigQuery table called us_state_salesregions - A table that provides a mapping of states to sales regions. Before you create this table, you need to create a BigQuery dataset.

Assign a schema to your Pub/Sub topic

Assigning a schema lets you run SQL queries on your Pub/Sub topic data. Currently, Dataflow SQL expects messages in Pub/Sub topics to be serialized in JSON format.

To assign a schema to the example Pub/Sub topic transactions:

  1. Create a text file and name it transactions_schema.yaml. Copy and paste the following schema text into transactions_schema.yaml.
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: tr_time_str
    description: Transaction time string
    mode: NULLABLE
    type: STRING
  - column: first_name
    description: First name
    mode: NULLABLE
    type: STRING
  - column: last_name
    description: Last name
    mode: NULLABLE
    type: STRING
  - column: city
    description: City
    mode: NULLABLE
    type: STRING
  - column: state
    description: State
    mode: NULLABLE
    type: STRING
  - column: product
    description: Product
    mode: NULLABLE
    type: STRING
  - column: amount
    description: Amount of transaction
    mode: NULLABLE
    type: FLOAT
  1. Assign the schema using the gcloud command-line tool.

    a. Update the gcloud tool with the following command. Ensure that the gcloud tool version is 242.0.0 or higher.

      gcloud components update
    

    b. Run the following command in a command-line window. Replace project-id with your project ID, and path-to-file with the path to your transactions_schema.yaml file.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    For more information about the command's parameters and allowed schema file formats, see the documentation page for gcloud data-catalog entries update.

    c. Confirm that your schema was successfully assigned to the transactions Pub/Sub topic. Replace project-id with your project ID.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Find Pub/Sub sources

The Dataflow SQL UI provides a way to find Pub/Sub data source objects for any project you have access to, so you don't have to remember their full names.

For the example in this tutorial, search for the transactions Pub/Sub topic that you created:

  1. In the left panel, search for projectid=project-id transactions. Replace project-id with your project ID.

    Data Catalog search panel in Dataflow SQL workspace.

View the schema

  1. In the left explorer panel of the Dataflow SQL UI, click on transactions or search for a Pub/Sub topic by typing projectid=project-id system=cloud_pubsub, and select the topic.
  2. Under Schema, you can view the schema you assigned to the Pub/Sub topic.

    Schema assigned to the topic including list of field names and their descriptions.

Create a SQL query

The Dataflow SQL UI lets you create SQL queries to run your Dataflow jobs.

The following SQL query is a data enrichment query. It adds an additional field, sales_region, to the Pub/Sub stream of events (transactions), using a BigQuery table (us_state_salesregions) that maps states to sales regions.

Copy and paste the following SQL query into the Query editor. Replace project-id with your project ID.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

When you enter a query in the Dataflow SQL UI, the query validator verifies the query syntax. A green check mark icon is displayed if the query is valid. If the query is invalid, a red exclamation point icon is displayed. If your query syntax is invalid, clicking on the validator icon provides information about what you need to fix.

The following screenshot shows the valid query in the Query editor. The validator displays a green check mark.

Dataflow SQL workspace with the query from the tutorial visible in the editor.

Create a Dataflow job to run your SQL query

To run your SQL query, create a Dataflow job from the Dataflow SQL UI.

  1. Above the Query editor, click Create job.

  2. In the Create Dataflow job panel that opens on the right, under the Destination option, select BigQuery. Then, for Dataset ID, select dataflow_sql_tutorial and set the Table name to sales.

    Create Dataflow SQL Job form.
  3. (Optional) Dataflow automatically chooses the settings that are optimal for your Dataflow SQL job, but you can expand the Optional parameters menu to manually specify the following pipeline options:

    • Maximum number of workers
    • Zone
    • Service account email
    • Machine type
    • Additional experiments
    • Worker IP address configuration
    • Network
    • Subnetwork
  4. Click Create. Your Dataflow job will take a few minutes to start running.

View the Dataflow job

Dataflow turns your SQL query into an Apache Beam pipeline. In the Dataflow web UI that opened in a new browser tab, you can see a graphical representation of your pipeline.

Pipeline from SQL query shown in Dataflow web UI.

You can click the boxes to see a breakdown of the transformations occurring in the pipeline. For example, if you click the top box in the graphical representation, labeled Run SQL Query, a graphic appears that shows the operations taking place behind the scenes.

The top two boxes represent the two inputs you joined: the Pub/Sub topic, transactions, and the BigQuery table, us_state_salesregions.

Write output of a join of two inputs completes in 25 seconds.

To view the output table that contains the job results, go to the BigQuery UI to see the table. In the left Explorer panel, under your project, click the dataflow_sql_tutorial dataset you created. Then, click on the output table, sales. The Preview tab displays the contents of the output table.

The sales preview table contains columns for tr_time_str, first_name, last_name, city, state, product, amount, and sales_region.

View past jobs and edit your queries

The Dataflow UI stores past jobs and queries in the Jobs page.

You can use the job history list to see previous SQL queries. For example, you want to modify your query to aggregate sales by sales region every 15 seconds. Use the Jobs page to access the running job that you started earlier in the tutorial, copy the SQL query, and run another job with a modified query.

  1. From the Dataflow Jobs page, click on the job you want to edit.

  2. On the Jobs detail page, locate the SQL query in the right panel under Pipeline options. Find the row for queryString.

    The job pipeline option named queryString.
  3. Copy and paste your SQL query in the Query editor to add tumbling windows. If you copy the following query, replace project-id with your project ID.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Click Create job to create a new job with the modified query.

Clean up

To avoid incurring charges to your Cloud Billing account for the resources used in this tutorial:

  1. Stop your transactions_injector.py publishing script if it is still running.

  2. Stop your running Dataflow jobs. Go to the Dataflow web UI in the Cloud Console.

    Go to the Dataflow web UI

    For each job you created from following this walkthrough, do the following steps:

    1. Click the name of the job.

    2. In the Job summary panel for the job, click Stop job. The Stop Job dialog appears with your options for how to stop your job.

    3. Click Cancel.

    4. Click Stop job. The service halts all data ingestion and processing as soon as possible. Because Cancel immediately halts processing, you might lose any "in-flight" data. Stopping a job might take a few minutes.

  3. Delete your BigQuery dataset. Go to the BigQuery web UI in the Cloud Console.

    Go to the BigQuery web UI

    1. In the Explorer panel, in the Resources section, click the dataflow_sql_tutorial dataset you created.

    2. In the details panel, on the right side, click Delete dataset. This action deletes the dataset, the table, and all the data.

    3. In the Delete dataset dialog box, confirm the delete command by typing the name of your dataset (dataflow_sql_tutorial) and then click Delete.

  4. Delete your Pub/Sub topic. Go to the Pub/Sub topics page in the Cloud Console.

    Go to the Pub/Sub topics page

    1. Check the checkbox next to the transactions topic.

    2. Click Delete to permanently delete the topic.

    3. Go to the Pub/Sub subscriptions page.

    4. Check the checkbox next to any remaining subscriptions to transactions. If your jobs are not running anymore, there might not be any subscriptions.

    5. Click Delete to permanently delete the subscriptions.

  5. Delete the Dataflow staging bucket in Cloud Storage. Go to the Cloud Storage browser in the Cloud Console.

    Go to the Cloud Storage browser

    1. Check the checkbox next to the Dataflow staging bucket.

    2. Click Delete to permanently delete the bucket.

What's next