Quickstart: stream processing with Dataflow

Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes with equal reliability and expressiveness. It provides a simplified pipeline development environment using the Apache Beam SDK, which has a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors. This quickstart shows you how to use Dataflow to:

  • Read messages published to a Pub/Sub topic
  • Window (or group) the messages by timestamp
  • Write the messages to Cloud Storage

This quickstart introduces you to using Dataflow in Java and Python. SQL is also supported.

You can also start by using UI-based Dataflow templates if you do not intend to do custom data processing.

Before you begin

  1. Follow the instructions for installing and initializing the Cloud SDK.
  2. Enable billing for your project.
  3. To complete this quickstart, you need to enable the following APIs: Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine.

    Enable the APIs

    It might take a few moments before the APIs appear in the console.

  4. Create a service account key:

    Create a service account key

    1. From the Service account list, select New service account.
    2. Enter a name in the Service account name field.
    3. From the Role list, select Project > Owner.
    4. Click Create.

    The key is sent to your browser's default downloads folder.

  5. Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to point to the service account key.

    export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
  6. Create variables for your bucket, project, and region. Cloud Storage bucket names must be globally unique. Select a Dataflow region close to where you run the commands in this quickstart.

    PROJECT_NAME=$(gcloud config get-value project)
  7. Create a Cloud Storage bucket owned by this project:

    gsutil mb gs://$BUCKET_NAME
  8. Create a Pub/Sub topic in this project:

    gcloud pubsub topics create cron-topic
  9. Create a Cloud Scheduler job in this project. The job publishes a message to a Cloud Pub/Sub topic at one-minute intervals.

    If an App Engine app does not exist for the project, this step will create one.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
     --topic=cron-topic --message-body="Hello!"

    Start the job.

    gcloud scheduler jobs run publisher-job
  10. Use the following command to clone the quickstart repository and navigate to the sample code directory:


    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics


    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Stream messages from Pub/Sub to Cloud Storage

Code sample

This sample code uses Dataflow to:

  • Read Pub/Sub messages.
  • Window (or group) messages into fixed-size intervals by publish timestamps.
  • Write the messages in each window to files in Cloud Storage.


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
  public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    String getOutput();

    void setOutput(String value);

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =


    Pipeline pipeline = Pipeline.create(options);

        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.


import argparse
import datetime
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (
            # Assigns window info to each Pub/Sub message based on its
            # publish timestamp.
            | "Window into Fixed Intervals"
            >> beam.WindowInto(window.FixedWindows(self.window_size))
            | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
            # Use a dummy key to group the elements in the same window.
            # Note that all the elements in one window must fit into memory
            # for this. If the windowed elements do not fit into memory,
            # please consider using `beam.util.BatchElements`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
            | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)

class AddTimestamps(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        """Processes each incoming windowed element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.

        yield {
            "message_body": element.decode("utf-8"),
            "publish_time": datetime.datetime.utcfromtimestamp(
            ).strftime("%Y-%m-%d %H:%M:%S.%f"),

class WriteBatchesToGCS(beam.DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, batch, window=beam.DoFn.WindowParam):
        """Write one batch per file to a Google Cloud Storage bucket. """

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        filename = "-".join([self.output_path, window_start, window_end])

        with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for element in batch:

def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True

    with beam.Pipeline(options=pipeline_options) as pipeline:
            | "Read PubSub Messages"
            >> beam.io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupWindowsIntoBatches(window_size)
            | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))

if __name__ == "__main__":  # noqa

    parser = argparse.ArgumentParser()
        help="The Cloud Pub/Sub topic to read from.\n"
        help="Output file's window size in number of minutes.",
        help="GCS Path of the output file including filename prefix.",
    known_args, pipeline_args = parser.parse_known_args()


Start the pipeline

To start the pipeline, run the following command:


mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_NAME \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
    --output=gs://$BUCKET_NAME/samples/output \
    --runner=DataflowRunner \


python PubSubToGCS.py \
  --project=$PROJECT_NAME \
  --region=$REGION \
  --input_topic=projects/$PROJECT_NAME/topics/cron-topic \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \

The preceding command runs locally and launches a Dataflow job that runs in the cloud. When the command returns JOB_MESSAGE_DETAILED: Workers have started successfully, exit the local program using Ctrl+C.

Observe job and pipeline progress

You can observe the job's progress in the Dataflow console.

Go to the Dataflow console

Observe the job's progress

Open the job details view to see:

  • Job structure
  • Job logs
  • Stage metrics

Observe the job's progress

You may have to wait a few minutes to see the output files in Cloud Storage.

Observe the job's progress

Alternatively, use the command line below to check which files have been written out.

gsutil ls gs://${BUCKET_NAME}/samples/

The output should look like the following:






  1. Delete the Cloud Scheduler job.

    gcloud scheduler jobs delete publisher-job
  2. In the Dataflow console, stop the job. Cancel the pipeline without draining it.

  3. Delete the topic.

    gcloud pubsub topics delete cron-topic
  4. Delete the files created by the pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
  5. Remove the Cloud Storage bucket.

    gsutil rb gs://${BUCKET_NAME}

What's next