Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes with equal reliability and expressiveness. It provides a simplified pipeline development environment using the Apache Beam SDK, which has a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors. This quickstart shows you how to use Dataflow to:
- Read messages published to a Pub/Sub topic
- Window (or group) the messages by timestamp
- Write the messages to Cloud Storage
This quickstart introduces you to using Dataflow in Java and Python. SQL is also supported.
You can also start by using UI-based Dataflow templates if you do not intend to do custom data processing.
Before you begin
- Follow the instructions for installing and initializing the Cloud SDK.
- Enable billing for your project.
To complete this quickstart, you need to enable the following APIs: Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine.
It might take a few moments before the APIs appear in the console.
Create a service account key:
- From the Service account list, select New service account.
- Enter a name in the Service account name field.
- From the Role list, select Project > Owner.
- Click Create.
The key is sent to your browser's default downloads folder.
Set the
GOOGLE_APPLICATION_CREDENTIALS
environment variable to point to the service account key.export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
Create variables for your bucket, project, and region. Cloud Storage bucket names must be globally unique. Select a Dataflow region close to where you run the commands in this quickstart.
BUCKET_NAME=BUCKET_NAME PROJECT_NAME=$(gcloud config get-value project) REGION=DATAFLOW_REGION
Create a Cloud Storage bucket owned by this project:
gsutil mb gs://$BUCKET_NAME
Create a Pub/Sub topic in this project:
gcloud pubsub topics create cron-topic
Create a Cloud Scheduler job in this project. The job publishes a message to a Cloud Pub/Sub topic at one-minute intervals.
If an App Engine app does not exist for the project, this step will create one.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=cron-topic --message-body="Hello!"
Start the job.
gcloud scheduler jobs run publisher-job
Use the following command to clone the quickstart repository and navigate to the sample code directory:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Stream messages from Pub/Sub to Cloud Storage
Code sample
This sample code uses Dataflow to:
- Read Pub/Sub messages.
- Window (or group) messages into fixed-size intervals by publish timestamps.
Write the messages in each window to files in Cloud Storage.
Java
Python
Start the pipeline
To start the pipeline, run the following command:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_NAME \ --region=$REGION \ --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2"
Python
python PubSubToGCS.py \ --project=$PROJECT_NAME \ --region=$REGION \ --input_topic=projects/$PROJECT_NAME/topics/cron-topic \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --temp_location=gs://$BUCKET_NAME/temp
The preceding command runs locally and launches a Dataflow job
that runs in the cloud. When the command returns JOB_MESSAGE_DETAILED: Workers
have started successfully
, exit the local program using Ctrl+C
.
Observe job and pipeline progress
You can observe the job's progress in the Dataflow console.
Open the job details view to see:
- Job structure
- Job logs
- Stage metrics
You may have to wait a few minutes to see the output files in Cloud Storage.
Alternatively, use the command line below to check which files have been written out.
gsutil ls gs://${BUCKET_NAME}/samples/
The output should look like the following:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32 gs://{$BUCKET_NAME}/samples/output-22:32-22:34 gs://{$BUCKET_NAME}/samples/output-22:34-22:36 gs://{$BUCKET_NAME}/samples/output-22:36-22:38
Cleanup
Delete the Cloud Scheduler job.
gcloud scheduler jobs delete publisher-job
In the Dataflow console, stop the job. Cancel the pipeline without draining it.
Delete the topic.
gcloud pubsub topics delete cron-topic
Delete the files created by the pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Remove the Cloud Storage bucket.
gsutil rb gs://${BUCKET_NAME}
What's next
If you would like to window Pub/Sub messages by a custom timestamp, you can specify the timestamp as an attribute in the Pub/Sub message, and then use the custom timestamp with PubsubIO's
withTimestampAttribute
.Take a look at Google's open-source Dataflow templates designed for streaming.
Read more about how Dataflow integrates with Pub/Sub.
For more about windowing, see the Apache Beam Mobile Gaming Pipeline example.