This tutorial uses the Pub/Sub Topic to BigQuery template to create and run a Dataflow template job using the Google Cloud console or Google Cloud CLI. The tutorial walks you through a streaming pipeline example that reads JSON-encoded messages from Pub/Sub, transforms message data with the Apache Beam SDK, and writes the results to a BigQuery table.
Streaming analytics and data integration pipelines use Pub/Sub to ingest and distribute data. Pub/Sub enables you to create systems of event producers and consumers, called publishers and subscribers. Publishers send events to the Pub/Sub service asynchronously, and Pub/Sub delivers the events to all services that need to react to them.
Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes. It provides a simplified pipeline development environment that uses the Apache Beam SDK to transform incoming data and then output the transformed data.
Objectives
- Create a Pub/Sub topic.
- Create a BigQuery dataset with a table and schema.
- Use a Google-provided streaming template to stream data from your Pub/Sub topic to BigQuery using Dataflow.
Costs
This tutorial uses the following billable components of Google Cloud:
- Dataflow
- Pub/Sub
- Cloud Storage
- Cloud Scheduler
- BigQuery
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Clean up.
Before you begin
This section shows you how to enable APIs, create a service account, and grant an Owner role to the service account. In a production environment, don't grant the Owner role. Instead, use the appropriate Dataflow-specific permissions and roles.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, and Cloud Scheduler APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, and Cloud Scheduler APIs.
- If you're using the Google Cloud CLI to complete this tutorial, install and initialize the Google Cloud CLI. If you're using the Google Cloud console to complete this tutorial, skip this step.
Create the example source and sink
This section explains how to create the following:
- A streaming source of data using Pub/Sub
- A dataset to load the data into BigQuery
Create a Cloud Storage bucket
Begin by creating a Cloud Storage bucket using the Google Cloud console or Google Cloud CLI. This bucket is used as a temporary storage location by the Dataflow pipeline.
Console
In the Google Cloud console, go to the Cloud Storage Browser page.
Click Create bucket.
On the Create a bucket page, for Name your bucket, enter a name that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique. Do not select the other options.
Click Create.
gcloud
Use the gsutil mb
command:
gsutil mb gs://BUCKET_NAME
Replace BUCKET_NAME
with a name for your Cloud Storage bucket
that meets the bucket naming requirements.
Cloud Storage bucket names must be globally unique.
Create a Pub/Sub topic and subscription
Create a Pub/Sub topic and then create a subscription to that topic.
To create a topic, complete the following steps.Console
In the Google Cloud console, go to the Pub/Sub Topics page.
Click Create topic.
In the Topic ID field, enter an ID for your topic. For information about how to name a topic, see Guidelines to name a topic or a subscription.
Retain the option Add a default subscription. Do not select the other options.
Click Create topic.
gcloud
To create a topic, run the
gcloud pubsub topics create
command. For information about how to name a subscription, see
Guidelines to name a topic or a subscription.
gcloud pubsub topics create TOPIC_ID
Replace TOPIC_ID
with a name for your Pub/Sub topic.
To create a subscription to your topic, run the
gcloud pubsub subscriptions create
command:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
Replace SUBSCRIPTION_ID
with a name for your Pub/Sub subscription.
Create and run Cloud Scheduler jobs
Create and run two Cloud Scheduler jobs, one that publishes "positive ratings" and a second that publishes "negative ratings" to your Pub/Sub topic.
Console
Create a Cloud Scheduler job for positive ratings.
Visit the Cloud Scheduler page in the console.
Click the Create a job button.
Enter the name
positive-ratings-publisher
.Select a region for your job to run in. Select a Dataflow region close to where you run the commands in this tutorial. The value of the
REGION
variable must be a valid region name. For more information about regions and locations, see Dataflow locations.Specify the frequency for your job, using the unix-cron format:
* * * * *
See Configuring Cron Job Schedules for more information.
Select your timezone.
Click Continue
Select Pub/Sub from the Target dropdown.
Select your Topic name from the dropdown.
Add the following Message string to be sent to your target:
{"url": "https://beam.apache.org/", "review": "positive"}
Click Create.
You now have a cron job that sends a message with a positive rating to your Pub/Sub topic every minute. Your Cloud Function is subscribed to that topic.
Create a Cloud Scheduler job for negative ratings.
On the Cloud Scheduler page in the console, click the Create a job button.
Enter the name
negative-ratings-publisher
.Select a region for your job to run in.
Specify the frequency for your job, using the unix-cron format:
*/2 * * * *
See Configuring Cron Job Schedules for more information.
Select your timezone.
Click Continue
Select Pub/Sub from the Target dropdown.
Select your Topic name from the dropdown.
Add the following Message string to be sent to your target:
{"url": "https://beam.apache.org/", "review": "negative"}
Click Create.
You now have a cron job that sends a message with a negative rating to your Pub/Sub topic every two minutes. Your Cloud Function is subscribed to that topic.
gcloud
To create a Cloud Scheduler job for this tutorial, use the
gcloud scheduler jobs create
command. This step creates a publisher for "positive ratings" that publishes one message per minute.gcloud scheduler jobs create pubsub positive-ratings-publisher \ --schedule="* * * * *" \ --location=DATAFLOW_REGION \ --topic="TOPIC_ID" \ --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
Replace
DATAFLOW_REGION
with the regional endpoint for deploying your Dataflow job. Select a Dataflow region close to where you run the commands in this tutorial. The value of theREGION
variable must be a valid region name. For more information about regions and locations, see Dataflow locations.To start the Cloud Scheduler job, use the
gcloud scheduler jobs run
command.gcloud scheduler jobs run --location=DATAFLOW_REGION positive-ratings-publisher
Create and run another similar publisher for "negative ratings" that publishes one message every two minutes. This step creates a publisher for "negative ratings" that publishes one message every two minutes.
gcloud scheduler jobs create pubsub negative-ratings-publisher \ --schedule="*/2 * * * *" \ --location=DATAFLOW_REGION \ --topic="TOPIC_ID" \ --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
Start the second Cloud Scheduler job.
gcloud scheduler jobs run --location=DATAFLOW_REGION negative-ratings-publisher
Create a BigQuery dataset
Create a BigQuery dataset and table with the appropriate schema for your Pub/Sub topic.
Console
Create a BigQuery dataset.
Open the BigQuery page in the Google Cloud console.
In the Explorer panel, select the project where you want to create the dataset.
Expand the
Actions option and click Create dataset.On the Create dataset page:
- For Dataset ID, enter
tutorial_dataset
. For Data location, choose a geographic location for the dataset. After a dataset is created, the location can't be changed.
Do not select the other options.
Click Create dataset.
- For Dataset ID, enter
Create a BigQuery table with a schema.
In the Explorer panel, expand your project and select your
tutorial_dataset
dataset.Expand the
Actions option and click Open.In the details panel, click Create table
.On the Create table page, in the Source section, select Empty table.
On the Create table page, in the Destination section:
- Verify that Dataset name is set to
tutorial_dataset
. - In the Table name field, enter
tutorial
. - Verify that Table type is set to Native table.
- Verify that Dataset name is set to
In the Schema section, enter the schema definition. Enable Edit as text and enter the following table schema as a JSON array.
[ { "mode": "NULLABLE", "name": "url", "type": "STRING" }, { "mode": "NULLABLE", "name": "review", "type": "STRING" } ]
For Partition and cluster settings leave the default value —
No partitioning
.In the Advanced options section, for Encryption leave the default value:
Google-managed key
. By default, Dataflow encrypts customer content stored at rest.Click Create table.
gcloud
Use the bq mk
command to create the dataset.
bq --location=DATAFLOW_REGION mk \
PROJECT_ID:tutorial_dataset
Replace PROJECT_ID
with the project ID of your project.
Use the bq mk
command with the --table
or -t
flag to create a table in
your dataset.
bq mk \
--table \
PROJECT_ID:tutorial_dataset.tutorial \
url:STRING,review:STRING
Run the pipeline
Run a streaming pipeline using the Google-provided Pub/Sub Topic to BigQuery template. The pipeline gets incoming data from the Pub/Sub topic and outputs the data to your BigQuery dataset.
Console
In the Google Cloud console, go to the Dataflow Jobs page.
Click Create job from template.
Enter a Job name for your Dataflow job.
For Dataflow template, select the Pub/Sub Topic to BigQuery template.
For Input Pub/Sub topic, enter the following:
projects/PROJECT_ID/topics/TOPIC_ID
Replace
PROJECT_ID
with the project ID of the project where you created your BigQuery dataset andTOPIC_ID
with the name of your Pub/Sub topic.For BigQuery output table, enter the following:
PROJECT_ID:tutorial_dataset.tutorial
For Temporary location, enter the following:
gs://BUCKET_NAME/temp/
Replace
BUCKET_NAME
with the name of your Cloud Storage bucket. Thetemp
folder stores temporary files, like the staged pipeline job.Click Run job.
gcloud
To run the template in your shell or terminal, use the
gcloud dataflow jobs run
command.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_ID,\
outputTableSpec=PROJECT_ID:tutorial_dataset.tutorial,\
outputDeadletterTable=PROJECT_ID:tutorial_dataset.tutorial
Replace JOB_NAME
with a unique name of your choice.
View your results
View the data written to your BigQuery table.
Console
In the Google Cloud console, go to the BigQuery page.
Go to the BigQuery pageIn the query editor, run the following query:
SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial` LIMIT 1000
It can take up to a minute for data to start appearing in your table.
The query returns rows that have been added to your table in the past 24 hours. You can also run queries using standard SQL.
gcloud
Check the results in BigQuery by running the following query:
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.tutorial_dataset.tutorial"'`'
While this pipeline is running, you can see new rows appended into the BigQuery table every minute.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the project
The easiest way to eliminate billing is to delete the Google Cloud project that you created for the tutorial.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Delete the individual resources
Stop the Dataflow pipeline
Console
In the Google Cloud console, go to the Dataflow Jobs page.
Click the job that you want to stop.
To stop a job, the status of the job must be running.
In the job details page, click Stop.
Click Cancel.
To confirm your choice, click Stop Job.
gcloud
To cancel your Dataflow job, use the
gcloud dataflow jobs
command.
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
Clean up Google Cloud project resources
Console
- Delete the Cloud Scheduler jobs.
- Go to the Cloud Scheduler page in the Google Cloud console.
Go to the Scheduler page - Select your jobs.
- Click the Delete button at the top of the page and confirm your delete.
- Go to the Cloud Scheduler page in the Google Cloud console.
- Delete the Pub/Sub topic and subscription.
- Go to the Pub/Sub topics page in the Google Cloud console.
Go to the Pub/Sub topics page - Select the topic that you created.
- Click Delete to permanently delete the topic.
- Go to the Pub/Sub subscriptions page in the Google Cloud console.
- Select the subscription created with your topic.
- Click Delete to permanently delete the subscription.
- Go to the Pub/Sub topics page in the Google Cloud console.
- Delete the BigQuery table and dataset.
- In the Google Cloud console, go to the BigQuery page.
Go to BigQuery - In the Explorer panel, expand your project.
- Next to the dataset you want to delete, click View actions, and then click delete.
- In the Google Cloud console, go to the BigQuery page.
- Delete the Cloud Storage bucket.
- In the Google Cloud console, go to the Cloud Storage page.
Go to the Cloud Storage page - Select the bucket that you want to delete, click Delete, and then follow the instructions.
- In the Google Cloud console, go to the Cloud Storage page.
gcloud
To delete the Cloud Scheduler jobs, use the
gcloud scheduler jobs delete
command.gcloud scheduler jobs delete negative-ratings-publisher --location=DATAFLOW_REGION
gcloud scheduler jobs delete positive-ratings-publisher --location=DATAFLOW_REGION
To delete the Pub/Sub subscription and topic, use the
gcloud pubsub subscriptions delete
and thegcloud pubsub topics delete
commands.gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
To delete the BigQuery table, use the
bq rm
command.bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
Delete the BigQuery dataset. The dataset alone does not incur any charges.
The following command also deletes all tables in the dataset. The tables and data cannot be recovered.
bq rm -r -f -d PROJECT_ID:tutorial_dataset
To delete the Cloud Storage bucket, use the
gsutil rm
command. The bucket alone does not incur any charges.The following command also deletes all objects in the bucket. These objects cannot be recovered.
gsutil rm -r gs://BUCKET_NAME
What's next
- Extend your Dataflow template with UDFs.
- Learn more about using Dataflow templates.
- Look through all the Google-provided templates.
- Read about using Pub/Sub to create and use topics and to create and use subscriptions.
- Learn more about using Cloud Scheduler to schedule and run cron jobs.
- Read about using BigQuery to create datasets.
- Explore reference architectures, diagrams, tutorials, and best practices about Google Cloud. Take a look at our Cloud Architecture Center.