Using Cloud Dataflow SQL

This tutorial shows you how to run Cloud Dataflow jobs using SQL and the Cloud Dataflow SQL UI. To demonstrate these, this tutorial walks you through an example that joins a stream of data from Cloud Pub/Sub with data from a BigQuery table.

Objectives

In this tutorial, you:

  • Use SQL to join Cloud Pub/Sub streaming data with BigQuery table data
  • Deploy a Cloud Dataflow job from the Cloud Dataflow SQL UI

Costs

This tutorial uses billable components of Google Cloud Platform, including:

  • Cloud Dataflow
  • Cloud Storage
  • Cloud Pub/Sub

Use the pricing calculator to generate a cost estimate based on your projected usage. New GCP users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the Project selector page

  3. Asegúrate de tener habilitada la facturación para tu proyecto.

    Aprende a habilitar la facturación

  4. Habilita las Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager API necesarias.

    Habilita las API

  5. Set up authentication:
    1. In the GCP Console, go to the Create service account key page.

      Go to the Create Service Account Key page
    2. From the Service account list, select New service account.
    3. In the Service account name field, enter a name.
    4. From the Role list, select Project > Owner.

      Note: The Role field authorizes your service account to access resources. You can view and change this field later by using the GCP Console. If you are developing a production app, specify more granular permissions than Project > Owner. For more information, see granting roles to service accounts.
    5. Click Create. A JSON file that contains your key downloads to your computer.
  6. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the file path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again.

  7. Install and initialize the Cloud SDK. Choose one of the installation options. You might need to set the project property to the project that you are using for this walkthrough.
  8. Go to the BigQuery web UI in the GCP Console. This opens your most recently accessed project. To switch to a different project, click the name of the project at the top of the BigQuery web UI, and search for the project you want to use.
    Go to the BigQuery web UI

Switch to the Cloud Dataflow SQL UI

In the BigQuery web UI, follow these steps to switch to the Cloud Dataflow UI.

  1. Click the More drop-down menu and select Query settings.

  2. In the Query settings menu that opens on the right, select Cloud Dataflow engine.

  3. If your project does not have the Cloud Dataflow and Data Catalog APIs enabled, you will be prompted to enable them. Click Enable APIs. Enabling the Cloud Dataflow and Data Catalog APIs might take a few minutes.

  4. When enabling the APIs is complete, click Save.

Create example sources

If you would like to follow the example provided in this tutorial, create the following sources and use them in the steps of the tutorial.

  • A Cloud Pub/Sub topic called transactions - A stream of transaction data that arrives via a subscription to the Cloud Pub/Sub topic. The data for each transaction includes information like the product purchased, the sale price, and the city and state in which the purchase occurred. After you create the Cloud Pub/Sub topic, you create a script that publishes messages to your topic. You will run this script in a later section of this tutorial.
  • A BigQuery table called us_state_salesregions - A table that provides a mapping of states to sales regions. Before you create this table, you need to create a BigQuery dataset.

Assign a schema to your Cloud Pub/Sub topic

Assigning a schema lets you run SQL queries on your Cloud Pub/Sub topic data. Currently, Cloud Dataflow SQL expects messages in Cloud Pub/Sub topics to be serialized in JSON format. Support for other formats such as Avro will be added in the future.

To assign a schema to the example Cloud Pub/Sub topic transactions:

  1. Create a text file and name it transactions_schema.yaml. Copy and paste the following schema text into transactions_schema.yaml.
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: attributes
    description: Pub/Sub message attributes
    mode: NULLABLE
    type: MAP<STRING,STRING>
  - column: payload
    description: Pub/Sub message payload
    mode: NULLABLE
    type: STRUCT
    subcolumns:
    - column: tr_time_str
      description: Transaction time string
      mode: NULLABLE
      type: STRING
    - column: first_name
      description: First name
      mode: NULLABLE
      type: STRING
    - column: last_name
      description: Last name
      mode: NULLABLE
      type: STRING
    - column: city
      description: City
      mode: NULLABLE
      type: STRING
    - column: state
      description: State
      mode: NULLABLE
      type: STRING
    - column: product
      description: Product
      mode: NULLABLE
      type: STRING
    - column: amount
      description: Amount of transaction
      mode: NULLABLE
      type: FLOAT
  1. Assign the schema using the gcloud command-line tool.

    a. Update the gcloud tool with the following command. Ensure that the gcloud tool version is 242.0.0 or higher.

      gcloud components update
    

    b. Run the following command in a command-line window. Replace project-id with your project ID, and path-to-file with the path to your transactions_schema.yaml file.

      gcloud beta data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    For more information about the command's parameters and allowed schema file formats, see the documentation page for gcloud beta data-catalog entries update.

    c. Confirm that your schema was successfully assigned to the transactions Cloud Pub/Sub topic. Replace project-id with your project ID.

      gcloud beta data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Find Cloud Pub/Sub sources

The Cloud Dataflow SQL UI provides a way to find Cloud Pub/Sub data source objects for any project you have access to, so you don't have to remember their full names.

For the example in this tutorial, add the transactions Cloud Pub/Sub topic that you created:

  1. In the left navigation panel, click the Add data drop-down list and select Cloud Dataflow sources.

  2. In the Add Cloud Dataflow source panel that opens on the right, choose Cloud Pub/Sub topics. In the search box, search for transactions. Select the topic and click Add.

View the schema

  1. In the left navigation panel of the Cloud Dataflow SQL UI, click Cloud Dataflow sources.
  2. Click Cloud Pub/Sub topics.
  3. Click transactions.
  4. Under Schema, you can view the schema you assigned to the transactions Cloud Pub/Sub topic.

Create a SQL query

The Cloud Dataflow SQL UI lets you create SQL queries to run your Cloud Dataflow jobs.

The following SQL query is a data enrichment query. It adds an additional field, sales_region, to the Cloud Pub/Sub stream of events (transactions), using a BigQuery table (us_state_salesregions) that maps states to sales regions.

Copy and paste the following SQL query into the Query editor. Replace project-id with your project ID.

SELECT tr.payload.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.payload.state = sr.state_code

When you enter a query in the Cloud Dataflow SQL UI, the query validator verifies the query syntax. A green check mark icon is displayed if the query is valid. If the query is invalid, a red exclamation point icon is displayed. If your query syntax is invalid, clicking on the validator icon provides information about what you need to fix.

The following screenshot shows the valid query in the Query editor. The validator displays a green check mark.

Enter your query in the editor.

Create a Cloud Dataflow job to run your SQL query

To run your SQL query, create a Cloud Dataflow job from the Cloud Dataflow SQL UI.

  1. Below the Query editor, click Create Cloud Dataflow job.

  2. In the Create Cloud Dataflow job panel that opens on the right, change the default Table name to dfsqltable_sales.

  3. Click Create. Your Cloud Dataflow job will take a few minutes to start running.

  4. The Query results panel appears in the UI. To get back to a job's Query results panel at a later time, find the job in the Job history panel and use Open query in editor button as shown in View the Cloud Dataflow job and output.

  5. Under Job information, click the Job ID link. This opens a new browser tab with the Cloud Dataflow Job Details page in the Cloud Dataflow web UI.

View the Cloud Dataflow job and output

Cloud Dataflow turns your SQL query into an Apache Beam pipeline. In the Cloud Dataflow web UI that opened in a new browser tab, you can see a graphical representation of your pipeline.

You can click the boxes to see a breakdown of the transformations occurring in the pipeline. For example, if you click the top box in the graphical representation, labeled Run SQL Query, a graphic appears that shows the operations taking place behind the scenes.

The top two boxes represent the two inputs you joined: the Cloud Pub/Sub topic, transactions, and the BigQuery table, us_state_salesregions.

To view the output table that contains the job results, go back to the browser tab with the Cloud Dataflow SQL UI. In the left navigation panel, under your project, click the dataflow_sql_dataset dataset you created. Then, click on the output table, dfsqltable_sales. The Preview tab displays the contents of the output table.

View past jobs and edit your queries

The Cloud Dataflow SQL UI stores past jobs and queries in the Job history panel. Jobs are listed by the day the job started. The job list first displays days that contain running jobs. Then, the list displays days with no running jobs.

You can use the job history list to edit previous SQL queries and run new Cloud Dataflow jobs. For example, you want to modify your query to aggregate sales by sales region every 15 seconds. Use the Job history panel to access the running job that you started earlier in the tutorial, change the SQL query, and run another job with the modified query.

  1. In the left navigation panel, click Job history.

  2. Under Job history, click Cloud Dataflow. All past jobs for your project appear.

  3. Click on the job you want to edit. Click Open in query editor.

  4. Edit your SQL query in the Query editor to add tumbling windows. Replace project-id with your project ID if you copy the following query.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.payload.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.payload.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. Below the Query editor, click Create Cloud Dataflow job to create a new job with the modified query.

Clean up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

  1. Stop your transactions_injector.py publishing script if it is still running.

  2. Stop your running Cloud Dataflow jobs. Go to the Cloud Dataflow web UI in the GCP Console.

    Go to the Cloud Dataflow web UI

    For each job you created from following this walkthrough, do the following steps:

    1. Click the name of the job.

    2. In the Job summary panel for the job, click Stop job. The Stop Job dialog appears with your options for how to stop your job.

    3. Click Cancel.

    4. Click Stop job. The service halts all data ingestion and processing as soon as possible. Because Cancel immediately halts processing, you might lose any "in-flight" data. Stopping a job might take a few minutes.

  3. Delete your BigQuery dataset. Go to the BigQuery web UI in the GCP Console.

    Go to the BigQuery web UI

    1. In the navigation panel, in the Resources section, click the dataflow_sql_dataset dataset you created.

    2. In the details panel, on the right side, click Delete dataset. This action deletes the dataset, the table, and all the data.

    3. In the Delete dataset dialog box, confirm the delete command by typing the name of your dataset (dataflow_sql_dataset) and then click Delete.

  4. Delete your Cloud Pub/Sub topic. Go to the Cloud Pub/Sub topics page in the GCP Console.

    Go to the Cloud Pub/Sub topics page

    1. Check the checkbox next to the transactions topic.

    2. Click Delete to permanently delete the topic.

    3. Go to the Cloud Pub/Sub subscriptions page.

    4. Check the checkbox next to any remaining subscriptions to transactions. If your jobs are not running anymore, there might not be any subscriptions.

    5. Click Delete to permanently delete the subscriptions.

  5. Delete the Cloud Dataflow staging bucket in Cloud Storage. Go to the Cloud Storage browser in the GCP Console.

    Go to the Cloud Storage browser

    1. Check the checkbox next to the Cloud Dataflow staging bucket.

    2. Click Delete to permanently delete the bucket.

What's next

¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.