Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes with equal reliability and expressiveness. It provides a simplified pipeline development environment using the Apache Beam SDK, which has a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors. This quickstart shows you how to use Dataflow to:
- Read messages published to a Pub/Sub topic
- Window (or group) the messages by timestamp
- Write the messages to Cloud Storage
This quickstart introduces you to using Dataflow in Java and Python. SQL is also supported.
You can also start by using UI-based Dataflow templates if you do not intend to do custom data processing.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.
- Enable the Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine APIs.
-
Create a service account:
-
In the Cloud Console, go to the Create service account page.
Go to Create service account - Select a project.
-
In the Service account name field, enter a name. The Cloud Console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create.
-
Click the Select a role field.
Under Quick access, click Basic, then click Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Cloud Console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your service account key. This variable only applies to your current shell session, so if you open a new session, set the variable again. - Install and initialize the Cloud SDK.
-
Create variables for your bucket, project, and region. Cloud Storage bucket names must be globally unique. Select a Dataflow region close to where you run the commands in this quickstart.
BUCKET_NAME=your-bucket-name PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=your-topic-id REGION=dataflow-region
-
Create a Cloud Storage bucket owned by this project:
gsutil mb gs://$BUCKET_NAME
-
Create a Pub/Sub topic in this project:
gcloud pubsub topics create $TOPIC_ID
-
Create a Cloud Scheduler job in this project. The job publishes a message to a Pub/Sub topic at one-minute intervals.
If an App Engine app does not exist for the project, this step will create one.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!"
Start the job.
gcloud scheduler jobs run publisher-job
-
Use the following commands to clone the quickstart repository and navigate to the sample code directory:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Stream messages from Pub/Sub to Cloud Storage
Code sample
This sample code uses Dataflow to:
- Read Pub/Sub messages.
- Window (or group) messages into fixed-size intervals by publish timestamps.
Write the messages in each window to files in Cloud Storage.
Java
Python
Start the pipeline
To start the pipeline, run the following command:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp
The preceding command runs locally and launches a Dataflow job
that runs in the cloud. When the command returns JOB_MESSAGE_DETAILED: Workers
have started successfully
, exit the local program using Ctrl+C
.
Observe job and pipeline progress
You can observe the job's progress in the Dataflow console.
Open the job details view to see:
- Job structure
- Job logs
- Stage metrics
You may have to wait a few minutes to see the output files in Cloud Storage.
Alternatively, use the command line below to check which files have been written out.
gsutil ls gs://${BUCKET_NAME}/samples/
The output should look like the following:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Cleanup
Delete the Cloud Scheduler job.
gcloud scheduler jobs delete publisher-job
In the Dataflow console, stop the job. Cancel the pipeline without draining it.
Delete the topic.
gcloud pubsub topics delete $TOPIC_ID
Delete the files created by the pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Remove the Cloud Storage bucket.
gsutil rb gs://${BUCKET_NAME}
What's next
If you would like to window Pub/Sub messages by a custom timestamp, you can specify the timestamp as an attribute in the Pub/Sub message, and then use the custom timestamp with PubsubIO's
withTimestampAttribute
.Take a look at Google's open-source Dataflow templates designed for streaming.
Read more about how Dataflow integrates with Pub/Sub.
Check out this tutorial that reads from Pub/Sub and writes to BigQuery using Dataflow Flex templates.
For more about windowing, see the Apache Beam Mobile Gaming Pipeline example.