Create a Dataflow pipeline using SQL
In this quickstart, you learn how to write SQL syntax to query a publicly available Pub/Sub topic. The SQL query runs a Dataflow pipeline, and the results of the pipeline are written to a BigQuery table.
To run a Dataflow SQL job, you can use the Google Cloud console, the Google Cloud CLI installed on a local machine, or Cloud Shell. In addition to the console, this example requires that you use either a local machine or Cloud Shell.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager, and Google Cloud Data Catalog APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager, and Google Cloud Data Catalog APIs.
Install and initialize gcloud CLI
Download the gcloud CLI package for your operating system, and then install and configure the gcloud CLI.
Depending on your internet connection, the download might take a while.
Create a BigQuery dataset
In this quickstart, the Dataflow SQL pipeline publishes a BigQuery dataset to a BigQuery table that you create in the next section.
Create a BigQuery dataset that is named
taxirides
:bq mk taxirides
Run the pipeline
Run a Dataflow SQL pipeline that calculates the number of passengers per minute by using data from a publicly available Pub/Sub topic about taxi rides. This command also creates a BigQuery table that's named
passengers_per_minute
to store the data output.gcloud dataflow sql query \ --job-name=dataflow-sql-quickstart \ --region=us-central1 \ --bigquery-dataset=taxirides \ --bigquery-table=passengers_per_minute \ 'SELECT TUMBLE_START("INTERVAL 60 SECOND") as period_start, SUM(passenger_count) AS pickup_count, FROM pubsub.topic.`pubsub-public-data`.`taxirides-realtime` WHERE ride_status = "pickup" GROUP BY TUMBLE(event_timestamp, "INTERVAL 60 SECOND")'
It might take a while for the Dataflow SQL job to start running.
The following describes the values used in the Dataflow SQL pipeline:
dataflow-sql-quickstart
: the name of the Dataflow jobus-central1
: the region where the job runstaxirides
: the name of the BigQuery dataset used as the sinkpassengers_per_minute
: the name of the BigQuery tabletaxirides-realtime
: the name of the Pub/Sub topic used as the source
The SQL command queries the Pub/Sub topic
taxirides-realtime
for the total number of passengers
picked up every 60 seconds. This public topic is based
on the NYC Taxi & Limousine Commission's open dataset.
View the results
Verify that the pipeline is running.
Console
In the console, go to the Dataflow Jobs page.
In the list of jobs, click dataflow-sql-quickstart.
In the Job info panel, confirm that the Job status field is set to Running.
The job might take several minutes to start. The Job status is set to Queued until the job starts.
In the Job graph tab, confirm that every step is running.
After the job starts, the steps might take several minutes to start running.
In the console, go to the BigQuery page.
In the Editor, paste the following SQL query and click Run:
'SELECT * FROM taxirides.passengers_per_minute ORDER BY pickup_count DESC LIMIT 5'
This query returns the busiest intervals from the
passengers_per_minute
table.
gcloud
Get the list of Dataflow jobs running in your project:
gcloud dataflow jobs list
Get more information about the
dataflow-sql-quickstart
job:gcloud dataflow jobs describe JOB_ID
Replace
JOB_ID
with the job ID of thedataflow-sql-quickstart
job from your project.Return the busiest intervals from the
passengers_per_minute
table:bq query \ 'SELECT * FROM taxirides.passengers_per_minute ORDER BY pickup_count DESC LIMIT 5'
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
To cancel the Dataflow job, go to the Jobs page.
In the list of jobs, click dataflow-sql-quickstart.
Click Stop > Cancel > Stop job.
Delete the
taxirides
dataset:bq rm taxirides
To confirm the deletion, type
y
.
What's next
- Learn more about using Dataflow SQL.
- Explore the gcloud CLI for Dataflow SQL.