Write transforms for Cloud Pub/Sub are included in
PubsubIO. You can use
read data from (or write data to) a Cloud Pub/Sub topic or subscription. In addition, you can achieve
exactly once processing of Cloud Pub/Sub message streams, as
messages based on custom message identifiers or identifiers
assigned by Cloud Pub/Sub.
Note: By default,
PubsubIO transforms create unbounded
PCollections. After you create a
PCollection by using
PubsubIO.Read, you should apply a windowing
strategy to that
PCollection before applying any transforms that group elements,
such as GroupByKey or
Specifying a Cloud Pub/Sub Topic or Subscription
PubsubIO, you can provide either the name of a Cloud Pub/Sub topic, or a
subscription that you've already created for a given topic. If you use a
transform with a topic name, Dataflow automatically creates and manages a subscription for you
behind the scenes.
Specifying a Cloud Pub/Sub Topic
When you use
PubsubIO, you can choose to specify a Cloud Pub/Sub topic to read from or
write to. When you provide a topic name, Dataflow automatically creates a subscription to
that topic for you. Dataflow begins reading from the topic from the time your pipeline starts;
any data published to the topic before your actual pipeline starts will not be available to your
Note: Dataflow creates the necessary Cloud Pub/Sub subscription as part of your
pipeline's setup. As pipeline setup times can vary depending on the number of Compute Engine
instances and other resources allocated in Cloud Platform, it can be difficult to know exactly
when your pipeline has begun reading from the Cloud Pub/Sub topic. If you need more fine control over
data read and write timing, you can Create and Manage Your Own
Subscription to a Cloud Pub/Sub topic, and pass that subscription to
Specifying a Cloud Pub/Sub Subscription for more information.
The topic name you provide should follow the format
projects/<Cloud Platform Project
Name>/topics/<topic name>, where the project name is the name of the project that
owns the topic. In addition, the topic name should satisfy the following requirements:
- The topic name must be between 3 and 255 characters in length.
- The topic name may only contain lowercase letters, numbers, dashes ("-"), underscores ("_") and periods (".").
- The topic name must begin with a letter.
- The topic name must end with either a letter or a number.
- The topic name cannot begin with the prefix "
Specifying a Cloud Pub/Sub Subscription
When you use a
PubsubIO transform, you can choose to specify a Cloud Pub/Sub subscription
to use when reading from or writing to a particular Cloud Pub/Sub topic. To use a subscription, you
must have already created and managed the subscription yourself; see the
Cloud Pub/Sub Subscriber Documentation for more information on creating
a Cloud Pub/Sub subscription.
The subscription name you provide should follow the format
Project Name>/subscriptions/<subscription name>, where the project name is the
name of the project that owns the subscription. In addition, the subscription name you provide
must meet the following requirements:
- The subscription name must be between 3 and 255 characters in length.
- The subscription name may only contain lowercase letters, numbers, dashes ("-"), underscores ("_") and periods (".").
- The subscription name must begin with a letter.
- The subscription name must end with either a letter or a number.
- The subscription name cannot begin with the prefix "
You should use a subscription if you want to ensure that your pipeline doesn't miss any data from your Cloud Pub/Sub topic. A Cloud Pub/Sub subscription continues to collect data even if there is no pipeline reading from it; when your pipeline starts, it will have access to all of the data collected by the subscription, even data that arrived prior to pipeline startup. Also, if you stop your pipeline and create a new pipeline to read from the same subscription, no data will be lost, as the subscription will continue collecting data in the absence of a pipeline.
Reading with PubsubIO
PubsubIO.Read transform continuously reads from a Cloud Pub/Sub stream and returns an
Strings that represent the data from
the stream. By default, each element in the resulting
PCollection is encoded as a
UTF-8 string. You can override the default encoding by using
.withCoder when you call
PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); // streamData is Unbounded; apply windowing afterward. PCollection<String> streamData = p.apply(PubsubIO.Read.named("ReadFromPubsub") .topic("/topics/my-topic"));
Reading a Bounded Set of Records from Cloud Pub/Sub
For testing purposes, you can read from Cloud Pub/Sub with the
(Dataflow SDK for Java 1.X) or the
DirectRunner (Dataflow SDK for Java 2.X).
If you need to operate on a bounded collection for testing purposes, you may supply a bound
on the amount of input to read. You can use either the
.maxNumRecords option to read
a fixed maximum number of records, or you can use
.maxReadTime to read records for a
fixed time duration.
Note that deduplication, retries, and recovery during failure are not guaranteed in this mode. For production, you should treat Cloud Pub/Sub as an unbounded source.
Writing with PubsubIO
PubsubIO.Write transform continuously writes an unbounded
String objects to a Cloud Pub/Sub stream. By default, the
PubsubIO.Write must contain strings encoded in
UTF-8. You can change the expected input type and encoding by using
// streamData is Unbounded. PCollection<String> streamData = ...; streamData.apply(PubsubIO.Write.named("WriteToPubsub") .topic("/topics/my-topic"));
Timestamps and Record IDs
You can add two types of metadata to records that you read or write using
timestamps and record IDs.
Using User-Specified Timestamps
You can use user-specified timestamps for precise control over how elements read from Cloud Pub/Sub are
assigned to windows in a Dataflow pipeline. To set user-specified timestamps, you invoke
timestampLabel when constructing
PubsubIO.Write transforms, passing a string value of your choice.
If you set a user-specified timestamp label when using
PubsubIO.Read, when reading
elements from Cloud Pub/Sub, the transform will use the value of the
attribute with the
name of the string you passed to
timestampLabel as the timestamp of each incoming
message. Timestamps should either be in form of milliseconds since the Unix epoch, or formatted
according to RFC 3339.
If you set a user-specified timestamp label when using
PubsubIO.Write, the transform
will write each element as a Cloud Pub/Sub message with an
attribute of the given name with the value
of the timestamp of the element in milliseconds since the Unix epoch.
Using Record IDs
Record IDs allow for exactly-once processing across the boundary between Dataflow and other
systems. To use record IDs, you invoke
idLabel when constructing
PubsubIO.Write transforms, passing a string value of
If you've set a record ID label when using
PubsubIO.Read, when Dataflow receives
multiple messages with the same ID (which will be read from the
the name of the string you passed to
idLabel), Dataflow will discard all but one of
the messages. However, Dataflow does not perform this de-duplication for messages with the same
record ID value that are published to Cloud Pub/Sub more than 10 minutes apart.
If you set a record ID label when using
PubsubIO.Write, the transform will write an
attribute on all outgoing messages with the
specified name and a unique value. Downstream systems can use this unique value to de-duplicate