Stream messages from Pub/Sub by using Dataflow
Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes with equal reliability and expressiveness. It provides a simplified pipeline development environment using the Apache Beam SDK, which has a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors. This quickstart shows you how to use Dataflow to:
- Read messages published to a Pub/Sub topic
- Window (or group) the messages by timestamp
- Write the messages to Cloud Storage
This quickstart introduces you to using Dataflow in Java and Python. SQL is also supported. This quickstart is also offered as a Google Cloud Skills Boost tutorial which offers temporary credentials to get you started.
You can also start by using UI-based Dataflow templates if you do not intend to do custom data processing.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine APIs.
-
Create a service account:
-
In the console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
To provide access to your project, grant the following role(s) to your service account: Dataflow Admin, Pub/Sub Admin, Cloud Storage Object Admin .
In the Select a role list, select a role.
For additional roles, click
Add another role and add each additional role. - Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. - Install and initialize the Google Cloud CLI.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.
-
Enable the Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine APIs.
-
Create a service account:
-
In the console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
To provide access to your project, grant the following role(s) to your service account: Dataflow Admin, Pub/Sub Admin, Cloud Storage Object Admin .
In the Select a role list, select a role.
For additional roles, click
Add another role and add each additional role. - Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. - Install and initialize the Google Cloud CLI.
-
Create variables for your bucket, project, and region. Cloud Storage bucket names must be globally unique. Select a Dataflow region close to where you run the commands in this quickstart. The value of the
REGION
variable must be a valid region name. For more information about regions and locations, see Dataflow locations.BUCKET_NAME=your-bucket-name PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=your-topic-id REGION=dataflow-region
-
Create a Cloud Storage bucket owned by this project:
gsutil mb gs://$BUCKET_NAME
-
Create a Pub/Sub topic in this project:
gcloud pubsub topics create $TOPIC_ID
-
Create a Cloud Scheduler job in this project. The job publishes a message to a Pub/Sub topic at one-minute intervals.
If an App Engine app does not exist for the project, this step will create one.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Start the job.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Use the following commands to clone the quickstart repository and navigate to the sample code directory:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Stream messages from Pub/Sub to Cloud Storage
Code sample
This sample code uses Dataflow to:
- Read Pub/Sub messages.
- Window (or group) messages into fixed-size intervals by publish timestamps.
Write the messages in each window to files in Cloud Storage.
Java
Python
Start the pipeline
To start the pipeline, run the following command:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp
The preceding command runs locally and launches a Dataflow job
that runs in the cloud. When the command returns JOB_MESSAGE_DETAILED: Workers
have started successfully
, exit the local program using Ctrl+C
.
Observe job and pipeline progress
You can observe the job's progress in the Dataflow console.
Open the job details view to see:
- Job structure
- Job logs
- Stage metrics
You may have to wait a few minutes to see the output files in Cloud Storage.
Alternatively, use the command line below to check which files have been written out.
gsutil ls gs://${BUCKET_NAME}/samples/
The output should look like the following:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.
Delete the Cloud Scheduler job.
gcloud scheduler jobs delete publisher-job --location=$REGION
In the Dataflow console, stop the job. Cancel the pipeline without draining it.
Delete the topic.
gcloud pubsub topics delete $TOPIC_ID
Delete the files created by the pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Remove the Cloud Storage bucket.
gsutil rb gs://${BUCKET_NAME}
What's next
- If you would like to window Pub/Sub messages by a custom timestamp, you can specify the timestamp as an attribute in the Pub/Sub message, and then use the custom timestamp with PubsubIO'swithTimestampAttribute
.
Take a look at Google's open-source Dataflow templates designed for streaming.
Read more about how Dataflow integrates with Pub/Sub.
Check out this tutorial that reads from Pub/Sub and writes to BigQuery using Dataflow Flex templates.
For more about windowing, see the Apache Beam Mobile Gaming Pipeline example.