The built-in Read
and Write
transforms for Pub/Sub are included in PubsubIO
. You can use PubsubIO
to
read data from (or write data to) a Pub/Sub topic or subscription. In addition, you can achieve
exactly once processing of Pub/Sub message streams, as PubsubIO
de-duplicates
messages based on custom message identifiers or identifiers
assigned by Pub/Sub.
Note: By default, PubsubIO
transforms create unbounded
PCollection
s. After you create a PCollection
by using
PubsubIO.Read
, you should apply a windowing
strategy to that PCollection
before applying any transforms that group elements,
such as GroupByKey or
Combine.
Specifying a Pub/Sub Topic or Subscription
To use PubsubIO
, you can provide either the name of a Pub/Sub topic, or a
subscription that you've already created for a given topic. If you use a PubsubIO
transform with a topic name, Dataflow automatically creates and manages a subscription for you
behind the scenes.
Specifying a Pub/Sub Topic
When you use PubsubIO
, you can choose to specify a Pub/Sub topic to read from or
write to. When you provide a topic name, Dataflow automatically creates a subscription to
that topic for you. Dataflow begins reading from the topic from the time your pipeline starts;
any data published to the topic before your actual pipeline starts will not be available to your
pipeline.
Note: Dataflow creates the necessary Pub/Sub subscription as part of your
pipeline's setup. As pipeline setup times can vary depending on the number of Compute Engine
instances and other resources allocated in Cloud Platform, it can be difficult to know exactly
when your pipeline has begun reading from the Pub/Sub topic. If you need more fine control over
data read and write timing, you can Create and Manage Your Own
Subscription to a Pub/Sub topic, and pass that subscription to PubsubIO
. See
Specifying a Pub/Sub Subscription for more information.
The topic name you provide should follow the format projects/<Cloud Platform Project
Name>/topics/<topic name>
, where the project name is the name of the project that
owns the topic. In addition, the topic name should satisfy the following requirements:
- The topic name must be between 3 and 255 characters in length.
- The topic name may only contain lowercase letters, numbers, dashes ("-"), underscores ("_") and periods (".").
- The topic name must begin with a letter.
- The topic name must end with either a letter or a number.
- The topic name cannot begin with the prefix "
goog
".
Specifying a Pub/Sub Subscription
When you use a PubsubIO
transform, you can choose to specify a Pub/Sub subscription
to use when reading from or writing to a particular Pub/Sub topic. To use a subscription, you
must have already created and managed the subscription yourself; see the
Pub/Sub Subscriber Documentation for more information on creating
a Pub/Sub subscription.
The subscription name you provide should follow the format /projects/<Cloud Platform
Project Name>/subscriptions/<subscription name>
, where the project name is the
name of the project that owns the subscription. In addition, the subscription name you provide
must meet the following requirements:
- The subscription name must be between 3 and 255 characters in length.
- The subscription name may only contain lowercase letters, numbers, dashes ("-"), underscores ("_") and periods (".").
- The subscription name must begin with a letter.
- The subscription name must end with either a letter or a number.
- The subscription name cannot begin with the prefix "
goog
".
You should use a subscription if you want to ensure that your pipeline doesn't miss any data from your Pub/Sub topic. A Pub/Sub subscription continues to collect data even if there is no pipeline reading from it; when your pipeline starts, it will have access to all of the data collected by the subscription, even data that arrived prior to pipeline startup. Also, if you stop your pipeline and create a new pipeline to read from the same subscription, no data will be lost, as the subscription will continue collecting data in the absence of a pipeline.
Reading with PubsubIO
The PubsubIO.Read
transform continuously reads from a Pub/Sub stream and returns an
unbounded PCollection
of String
s that represent the data from
the stream. By default, each element in the resulting PCollection
is encoded as a
UTF-8 string. You can override the default encoding by using .withCoder
when you call
PubsubIO.Read
.
Java
PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); // streamData is Unbounded; apply windowing afterward. PCollection<String> streamData = p.apply(PubsubIO.Read.named("ReadFromPubsub") .topic("/topics/my-topic"));
Reading a Bounded Set of Records from Pub/Sub
For testing purposes, you can read from Pub/Sub with the InProcessPipelineRunner
(Dataflow SDK for Java 1.X) or the DirectRunner
(Dataflow SDK for Java 2.X).
If you need to operate on a bounded collection for testing purposes, you may supply a bound
on the amount of input to read. You can use either the .maxNumRecords
option to read
a fixed maximum number of records, or you can use .maxReadTime
to read records for a
fixed time duration.
Note that deduplication, retries, and recovery during failure are not guaranteed in this mode. For production, you should treat Pub/Sub as an unbounded source.
Writing with PubsubIO
The PubsubIO.Write
transform continuously writes an unbounded
PCollection
of String
objects to a Pub/Sub stream. By default, the
input PCollection
to PubsubIO.Write
must contain strings encoded in
UTF-8. You can change the expected input type and encoding by using withCoder
.
Java
// streamData is Unbounded. PCollection<String> streamData = ...; streamData.apply(PubsubIO.Write.named("WriteToPubsub") .topic("/topics/my-topic"));
Timestamps and Record IDs
You can add two types of metadata to records that you read or write using PubsubIO
:
timestamps and record IDs.
Using User-Specified Timestamps
You can use user-specified timestamps for precise control over how elements read from Pub/Sub are
assigned to windows in a Dataflow pipeline. To set user-specified timestamps, you invoke
timestampLabel
when constructing PubsubIO.Read
or
PubsubIO.Write
transforms, passing a string value of your choice.
If you set a user-specified timestamp label when using PubsubIO.Read
, when reading
elements from Pub/Sub, the transform will use the value of the
attribute with the
name of the string you passed to timestampLabel
as the timestamp of each incoming
message. Timestamps should either be in form of milliseconds since the Unix epoch, or formatted
according to RFC 3339.
If you set a user-specified timestamp label when using PubsubIO.Write
, the transform
will write each element as a Pub/Sub message with an
attribute of the given name with the value
of the timestamp of the element in milliseconds since the Unix epoch.
Using Record IDs
Record IDs allow for exactly-once processing across the boundary between Dataflow and other
systems. To use record IDs, you invoke idLabel
when constructing
PubsubIO.Read
or PubsubIO.Write
transforms, passing a string value of
your choice.
If you've set a record ID label when using PubsubIO.Read
, when Dataflow receives
multiple messages with the same ID (which will be read from the
attribute with
the name of the string you passed to idLabel
), Dataflow will discard all but one of
the messages. However, Dataflow does not perform this de-duplication for messages with the same
record ID value that are published to Pub/Sub more than 10 minutes apart.
If you set a record ID label when using PubsubIO.Write
, the transform will write an
attribute on all outgoing messages with the
specified name and a unique value. Downstream systems can use this unique value to de-duplicate
messages.