Pub/Sub I/O

The built-in Read and Write transforms for Google Cloud Pub/Sub are included in PubsubIO. You can use PubsubIO to read data from (or write data to) a Pub/Sub topic or subscription.

Note: By default, PubsubIO transforms create unbounded PCollections. After you create a PCollection by using PubsubIO.Read, you should apply a windowing strategy to that PCollection before applying any transforms that group elements, such as GroupByKey or Combine.

Specifying a Pub/Sub Topic or Subscription

To use PubsubIO, you can provide either the name of a Pub/Sub topic, or a subscription that you've already created for a given topic. If you use a PubsubIO transform with a topic name, Dataflow automatically creates and manages a subscription for you behind the scenes.

Specifying a Pub/Sub Topic

When you use PubsubIO, you can choose to specify a Pub/Sub topic to read from or write to. When you provide a topic name, Dataflow automatically creates a subscription to that topic for you. Dataflow begins reading from the topic from the time your pipeline starts; any data published to the topic before your actual pipeline starts will not be available to your pipeline.

Note: Dataflow creates the necessary Pub/Sub subscription as part of your pipeline's setup. As pipeline setup times can vary depending on the number of Compute Engine instances and other resources allocated in Cloud Platform, it can be difficult to know exactly when your pipeline has begun reading from the Pub/Sub topic. If you need more fine control over data read and write timing, you can Create and Manage Your Own Subscription to a Pub/Sub topic, and pass that subscription to PubsubIO. See Specifying a Pub/Sub Subscription for more information.

The topic name you provide should follow the format projects/<Cloud Platform Project Name>/topics/<topic name>, where the project name is the name of the project that owns the topic. In addition, the topic name should satisfy the following requirements:

  • The topic name must be between 3 and 255 characters in length.
  • The topic name may only contain lowercase letters, numbers, dashes ("-"), underscores ("_") and periods (".").
  • The topic name must begin with a letter.
  • The topic name must end with either a letter or a number.
  • The topic name cannot begin with the prefix "goog".

Specifying a Pub/Sub Subscription

When you use a PubsubIO transform, you can choose to specify a Pub/Sub subscription to use when reading from or writing to a particular Pub/Sub topic. To use a subscription, you must have already created and managed the subscription yourself; see the Pub/Sub Subscriber Documentation for more information on creating a Pub/Sub subscription.

The subscription name you provide should follow the format /projects/<Cloud Platform Project Name>/subscriptions/<subscription name>, where the project name is the name of the project that owns the subscription. In addition, the subscription name you provide must meet the following requirements:

  • The subscription name must be between 3 and 255 characters in length.
  • The subscription name may only contain lowercase letters, numbers, dashes ("-"), underscores ("_") and periods (".").
  • The subscription name must begin with a letter.
  • The subscription name must end with either a letter or a number.
  • The subscription name cannot begin with the prefix "goog".

You should use a subscription if you want to ensure that your pipeline doesn't miss any data from your Pub/Sub topic. A Pub/Sub subscription continues to collect data even if there is no pipeline reading from it; when your pipeline starts, it will have access to all of the data collected by the subscription, even data that arrived prior to pipeline startup. Also, if you stop your pipeline and create a new pipeline to read from the same subscription, no data will be lost, as the subscription will continue collecting data in the absence of a pipeline.

Reading with PubsubIO

The PubsubIO.Read transform continuously reads from a Pub/Sub stream and returns an unbounded PCollection of Strings that represent the data from the stream. By default, each element in the resulting PCollection is encoded as a UTF-8 string. You can override the default encoding by using .withCoder when you call PubsubIO.Read.

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  // streamData is Unbounded; apply windowing afterward.
  PCollection<String> streamData =
    p.apply(PubsubIO.Read.named("ReadFromPubsub")
                       .topic("/topics/my-topic"));

Reading a Bounded Set of Records from Pub/Sub

For testing purposes, you can read from Pub/Sub with the InProcessPipelineRunner (Dataflow SDK for Java 1.X) or the DirectRunner (Dataflow SDK for Java 2.X).

If you need to operate on a bounded collection for testing purposes, you may supply a bound on the amount of input to read. You can use either the .maxNumRecords option to read a fixed maximum number of records, or you can use .maxReadTime to read records for a fixed time duration.

Note that deduplication, retries, and recovery during failure are not guaranteed in this mode. For production, you should treat Pub/Sub as an unbounded source.

Writing with PubsubIO

The PubsubIO.Write transform continuously writes an unbounded PCollection of String objects to a Pub/Sub stream. By default, the input PCollection to PubsubIO.Write must contain strings encoded in UTF-8. You can change the expected input type and encoding by using withCoder.

Java

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

Timestamps and Record IDs

You can add two types of metadata to records that you read or write using PubsubIO: timestamps and record IDs.

Using User-Specified Timestamps

You can use user-specified timestamps for precise control over how elements read from Pub/Sub are assigned to windows in a Dataflow pipeline. To set user-specified timestamps, you invoke timestampLabel when constructing PubsubIO.Read or PubsubIO.Write transforms, passing a string value of your choice.

If you set a user-specified timestamp label when using PubsubIO.Read, when reading elements from Pub/Sub, the transform will use the value of the attribute with the name of the string you passed to timestampLabel as the timestamp of each incoming message. Timestamps should either be in form of milliseconds since the Unix epoch, or formatted according to RFC 3339.

If you set a user-specified timestamp label when using PubsubIO.Write, the transform will write each element as a Pub/Sub message with an attribute of the given name with the value of the timestamp of the element in milliseconds since the Unix epoch.

Using Record IDs

Record IDs allow for exactly-once processing across the boundary between Dataflow and other systems. To use record IDs, you invoke idLabel when constructing PubsubIO.Read or PubsubIO.Write transforms, passing a string value of your choice.

If you've set a record ID label when using PubsubIO.Read, when Dataflow receives multiple messages with the same ID (which will be read from the attribute with the name of the string you passed to idLabel), Dataflow will discard all but one of the messages. However, Dataflow does not perform this de-duplication for messages with the same record ID value that are published to Pub/Sub more than 10 minutes apart.

If you set a record ID label when using PubsubIO.Write, the transform will write an attribute on all outgoing messages with the specified name and a unique value. Downstream systems can use this unique value to de-duplicate messages.

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation