Using Pub/Sub with Dataflow

Cloud Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes with equal reliability and expressiveness. It provides a simplified pipeline development environment using the Apache Beam SDK, which has a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors. This quickstart shows you you how to use Dataflow to:

  • Read messages published to a Pub/Sub topic
  • Window (or buffer) the messages into batches
  • Write the messages to Cloud Storage

This quickstart introduces you to using Dataflow in Java and Python. SQL is also supported.

You can also start by using UI-based Dataflow templates if you do not intend to do custom data processing.

Before you begin

Set up your Cloud Storage environment

  1. Follow the instructions for installing and initializing the Cloud SDK.
  2. Enable billing for your project.
  3. To complete this quickstart, you need to enable the following APIs: Compute Engine, Stackdriver, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine.

    Enable the APIs

    It might take a few moments before the APIs appear in the console.

  4. Create a service account key:

    Create a service account key

    1. From the Service account list, select New service account.
    2. Enter a name in the Service account name field.
    3. From the Role list, select Project > Owner.
    4. Click Create.

    The key is sent to your browser's default downloads folder.

  5. Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to point to the service account key.

    export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
    
  6. Create variables for your bucket and project. Cloud Storage bucket names must be globally unique.

    BUCKET_NAME=bucket-name
    PROJECT_NAME=$(gcloud config get-value project)
    
  7. Create a Cloud Storage bucket owned by this project:

    gsutil mb gs://$BUCKET_NAME
    
  8. Create a Pub/Sub topic in this project:

    gcloud pubsub topics create cron-topic
    
  9. Create a Cloud Scheduler job in this project. The job publishes a message to a Cloud Pub/Sub topic at one-minute intervals.

    If an App Engine app does not exist for the project, this step will create one.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
     --topic=cron-topic --message-body="Hello!"
    

    Start the job.

    gcloud scheduler jobs run publisher-job
    
  10. Use the following command to clone the quickstart repository and navigate to the sample code directory:

    java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics
    

    python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies
    

Stream messages from Pub/Sub to Cloud Storage

Code sample

This sample code uses Dataflow to:

  • Read Pub/Sub messages.
  • Window (or group) messages by timestamp in 2-minute intervals.
  • Save the messages in each window as files in Cloud Storage.

java

import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import java.io.IOException;


public class PubSubToGCS {
  /*
  * Define your own configuration options. Add your own arguments to be processed
  * by the command-line parser, and specify default values for them.
  */
  public interface PubSubToGCSOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();
    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();
    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();
    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGCSOptions options = PipelineOptionsFactory
      .fromArgs(args)
      .withValidation()
      .as(PubSubToGCSOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
      // 1) Read string messages from a Pub/Sub topic.
      .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
      // 2) Group the messages into fixed-sized minute intervals.
      .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
      // 3) Write one file to GCS for every window of messages.
      .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

python

import argparse
import datetime
import json
import logging

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions


class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (pcoll
                # Assigns window info to each Pub/Sub message based on its
                # publish timestamp.
                | 'Window into Fixed Intervals' >> beam.WindowInto(
                    window.FixedWindows(self.window_size))
                | 'Add timestamps to messages' >> (beam.ParDo(AddTimestamps()))
                # Use a dummy key to group the elements in the same window.
                # Note that all the elements in one window must fit into memory
                # for this. If the windowed elements do not fit into memory,
                # please consider using `beam.util.BatchElements`.
                # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
                | 'Add Dummy Key' >> beam.Map(lambda elem: (None, elem))
                | 'Groupby' >> beam.GroupByKey()
                | 'Abandon Dummy Key' >> beam.MapTuple(lambda _, val: val))


class AddTimestamps(beam.DoFn):

    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        """Processes each incoming windowed element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """

        yield {
            'message_body': element.decode('utf-8'),
            'publish_time': datetime.datetime.utcfromtimestamp(
                float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f"),
        }


class WriteBatchesToGCS(beam.DoFn):

    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, batch, window=beam.DoFn.WindowParam):
        """Write one batch per file to a Google Cloud Storage bucket. """

        ts_format = '%H:%M'
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        filename = '-'.join([self.output_path, window_start, window_end])

        with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode='w') as f:
            for element in batch:
                f.write('{}\n'.format(json.dumps(element)).encode('utf-8'))


def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (pipeline
         | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
         | 'Window into' >> GroupWindowsIntoBatches(window_size)
         | 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(output_path)))


if __name__ == '__main__': # noqa
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_topic',
        help='The Cloud Pub/Sub topic to read from.\n'
             '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".')
    parser.add_argument(
        '--window_size',
        type=float,
        default=1.0,
        help='Output file\'s window size in number of minutes.')
    parser.add_argument(
        '--output_path',
        help='GCS Path of the output file including filename prefix.')
    known_args, pipeline_args = parser.parse_known_args()

    run(known_args.input_topic, known_args.output_path, known_args.window_size,
        pipeline_args)

Start the pipeline

To start the pipeline, use the following command:

java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGCS \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_NAME \
    --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
    --output=gs://$BUCKET_NAME/samples/output \
    --runner=DataflowRunner \
    --windowSize=2"

python

python PubSubToGCS.py \
  --project=$PROJECT_NAME \
  --input_topic=projects/$PROJECT_NAME/topics/cron-topic \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --temp_location=gs://$BUCKET_NAME/temp

Observe job and pipeline progress

You can observe the job's progress in the Dataflow console.

Go to the Dataflow console

Observe the job's progress

Open the job details view to see:

  • Job structure
  • Job logs
  • Stage metrics

Observe the job's progress

You may have to wait a few minutes to see the output files in Cloud Storage.

Observe the job's progress

Alternatively, use the command line below to check which files have been written out.

gsutil ls gs://${BUCKET_NAME}/samples/

The output should look like the following:

java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32
gs://{$BUCKET_NAME}/samples/output-22:32-22:34
gs://{$BUCKET_NAME}/samples/output-22:34-22:36
gs://{$BUCKET_NAME}/samples/output-22:36-22:38

Cleanup

  1. Delete the Cloud Scheduler job.

    gcloud scheduler jobs delete publisher-job
    
  2. Use Ctrl+C to stop the program in your terminal.

  3. In the Dataflow console, stop the job. Cancel the pipeline without draining it.

  4. Delete the topic.

    gcloud pubsub topics delete cron-topic
    
  5. Delete the files created by the pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    
  6. Remove the Cloud Storage bucket.

    gsutil rb gs://${BUCKET_NAME}
    

What's next

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Pub/Sub Documentation