Class Publisher (2.23.0-rc)

Publish messages to the Cloud Pub/Sub service.

This class is used to publish messages to a fixed topic, with a fixed configuration such as credentials, batching, background threads, etc. Applications that publish messages to multiple topics need to create separate instances of this class. Applications wanting to publish events with different batching configuration also need to create separate instances.

See Also

https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub service.

Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::StatusOr;
  [](pubsub::Publisher publisher) {
    auto message_id = publisher.Publish(
        pubsub::MessageBuilder{}.SetData("Hello World!").Build());
    auto done = message_id.then([](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Hello World! published with id=" << *id << "\n";
    });
    // Block until the message is published
    done.get();
  }
Message Ordering

A Publisher configured to preserve message ordering will sequence the messages that share a common ordering key (see MessageBuilder::SetOrderingKey()). Messages will be batched by ordering key, and new batches will wait until the status of the previous batch is known. On an error, all pending and queued messages are discarded, and the publisher rejects any new messages for the ordering key that experienced problems. The application must call Publisher::ResumePublishing() to to restore publishing.

Performance

Publisher objects are relatively cheap to create, copy, and move. However, each Publisher object must be created with a std::shared_ptr<PublisherConnection>, which itself is relatively expensive to create. Therefore, connection instances should be shared when possible. See the MakePublisherConnection() method and the PublisherConnection interface for more details.

Thread Safety

Instances of this class created via copy-construction or copy-assignment share the underlying pool of connections. Access to these copies via multiple threads is guaranteed to work. Two threads operating on the same instance of this class is not guaranteed to work.

Background Threads

This class uses the background threads configured via the Options from GrpcOptionList. Applications can create their own pool of background threads by (a) creating their own google::cloud::CompletionQueue, (b) passing this completion queue as a GrpcCompletionQueueOption, and (c) attaching any number of threads to the completion queue.

Example: using a custom thread pool
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::GrpcCompletionQueueOption;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  [](std::string project_id, std::string topic_id) {
    // Create our own completion queue to run the background activity, such as
    // flushing the publisher.
    google::cloud::CompletionQueue cq;
    // Setup one or more of threads to service this completion queue. These must
    // remain running until all the work is done.
    std::vector<std::thread> tasks;
    std::generate_n(std::back_inserter(tasks), 4, [&cq] {
      return std::thread([cq]() mutable { cq.Run(); });
    });

    auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
    auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
        std::move(topic), Options{}.set<GrpcCompletionQueueOption>(cq)));

    std::vector<future<void>> ids;
    for (char const* data : {"1", "2", "3", "go!"}) {
      ids.push_back(
          publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
              .then([data](future<StatusOr<std::string>> f) {
                auto s = f.get();
                if (!s) return;
                std::cout << "Sent '" << data << "' (" << *s << ")\n";
              }));
    }
    publisher.Flush();
    // Block until they are actually sent.
    for (auto& id : ids) id.get();

    // Shutdown the completion queue and join the threads
    cq.Shutdown();
    for (auto& t : tasks) t.join();
  }
Asynchronous Functions

Some of the member functions in this class return a future<T> (or future<StatusOr<T>>) object. Readers are probably familiar with std::future<T>. Our version adds a .then() function to attach a callback to the future, which is invoked when the future is satisfied. This function returns a future<U> where U is the return value of the attached function. More details in the google::cloud::future documentation.

Error Handling

This class uses StatusOr<T> to report errors. When an operation fails to perform its work the returned StatusOr<T> contains the error details. If the ok() member function in the StatusOr<T> returns true then it contains the expected result. Please consult the google::cloud::StatusOr documentation for more details.

Batching Configuration Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  [](std::string project_id, std::string topic_id) {
    auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
    // By default, the publisher will flush a batch after 10ms, after it
    // contains more than 100 message, or after it contains more than 1MiB of
    // data, whichever comes first. This changes those defaults.
    auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
        std::move(topic),
        Options{}
            .set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))
            .set<pubsub::MaxBatchBytesOption>(4 * 1024 * 1024L)
            .set<pubsub::MaxBatchMessagesOption>(200)));

    std::vector<future<void>> ids;
    for (char const* data : {"1", "2", "3", "go!"}) {
      ids.push_back(
          publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
              .then([data](future<StatusOr<std::string>> f) {
                auto s = f.get();
                if (!s) return;
                std::cout << "Sent '" << data << "' (" << *s << ")\n";
              }));
    }
    publisher.Flush();
    // Block until they are actually sent.
    for (auto& id : ids) id.get();
  }

Constructors

Publisher(std::shared_ptr< PublisherConnection >)

Parameter
NameDescription
connection std::shared_ptr< PublisherConnection >

Publisher(Publisher const &)

Parameter
NameDescription
Publisher const &

Publisher(Publisher &&)

Parameter
NameDescription
Publisher &&

Publisher(std::shared_ptr< PublisherConnection >, PublisherOptions const &)

Parameters
NameDescription
connection std::shared_ptr< PublisherConnection >
PublisherOptions const &

Operators

operator=(Publisher const &)

Parameter
NameDescription
Publisher const &
Returns
TypeDescription
Publisher &

operator=(Publisher &&)

Parameter
NameDescription
Publisher &&
Returns
TypeDescription
Publisher &

Functions

Publish(Message)

Publishes a message to this publisher's topic.

Note that the message may be batched, depending on the Publisher's configuration. It could be delayed until the batch has enough messages, or enough data, or enough time has elapsed. See the PublisherOptionList documentation for more details.

Idempotency

This is a non-idempotent operation, but the client library will automatically retry RPCs that fail with transient errors. As Cloud Pub/Sub has "at least once" delivery semantics applications are expected to handle duplicate messages without problems. The application can disable retries by changing the retry policy, please see the example below.

Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::StatusOr;
  [](pubsub::Publisher publisher) {
    auto message_id = publisher.Publish(
        pubsub::MessageBuilder{}.SetData("Hello World!").Build());
    auto done = message_id.then([](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Hello World! published with id=" << *id << "\n";
    });
    // Block until the message is published
    done.get();
  }
Disabling Retries Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  [](std::string project_id, std::string topic_id) {
    auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
    auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
        std::move(topic),
        Options{}
            .set<pubsub::RetryPolicyOption>(
                pubsub::LimitedErrorCountRetryPolicy(/*maximum_failures=*/0)
                    .clone())
            .set<pubsub::BackoffPolicyOption>(
                pubsub::ExponentialBackoffPolicy(
                    /*initial_delay=*/std::chrono::milliseconds(200),
                    /*maximum_delay=*/std::chrono::seconds(45),
                    /*scaling=*/2.0)
                    .clone())));

    std::vector<future<bool>> done;
    for (char const* data : {"1", "2", "3", "go!"}) {
      done.push_back(
          publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
              .then([](future<StatusOr<std::string>> f) {
                return f.get().ok();
              }));
    }
    publisher.Flush();
    int count = 0;
    for (auto& f : done) {
      if (f.get()) ++count;
    }
    std::cout << count << " messages sent successfully\n";
  }
Changing Retry Parameters Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  [](std::string project_id, std::string topic_id) {
    auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
    // By default a publisher will retry for 60 seconds, with an initial backoff
    // of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
    // 30% after each attempt. This changes those defaults.
    auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
        std::move(topic),
        Options{}
            .set<pubsub::RetryPolicyOption>(
                pubsub::LimitedTimeRetryPolicy(
                    /*maximum_duration=*/std::chrono::minutes(10))
                    .clone())
            .set<pubsub::BackoffPolicyOption>(
                pubsub::ExponentialBackoffPolicy(
                    /*initial_delay=*/std::chrono::milliseconds(200),
                    /*maximum_delay=*/std::chrono::seconds(45),
                    /*scaling=*/2.0)
                    .clone())));

    std::vector<future<bool>> done;
    for (char const* data : {"1", "2", "3", "go!"}) {
      done.push_back(
          publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
              .then([](future<StatusOr<std::string>> f) {
                return f.get().ok();
              }));
    }
    publisher.Flush();
    int count = 0;
    for (auto& f : done) {
      if (f.get()) ++count;
    }
    std::cout << count << " messages sent successfully\n";
  }
Parameter
NameDescription
m Message
Returns
TypeDescription
future< StatusOr< std::string > >

a future that becomes satisfied when the message is published or on a unrecoverable error. On success, the future is satisfied with the server-assigned ID of the message. IDs are guaranteed to be unique within the topic.

Flush()

Forcibly publishes any batched messages.

As applications can configure a Publisher to buffer messages, it is sometimes useful to flush them before any of the normal criteria to send the RPCs is met.

Idempotency

See the description in Publish().

Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::StatusOr;
  [](pubsub::Publisher publisher) {
    std::vector<future<void>> done;
    for (int i = 0; i != 10; ++i) {
      auto message_id = publisher.Publish(
          pubsub::MessageBuilder{}
              .SetData("Hello World! [" + std::to_string(i) + "]")
              .SetAttribute("origin", "cpp-sample")
              .SetAttribute("username", "gcp")
              .Build());
      done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
        auto id = f.get();
        if (!id) throw std::move(id).status();
        std::cout << "Message " << i << " published with id=" << *id << "\n";
      }));
    }
    publisher.Flush();
    // Block until all the messages are published (optional)
    for (auto& f : done) f.get();
  }
Returns
TypeDescription
void

ResumePublish(std::string)

Resumes publishing after an error.

If the publisher options have message ordering enabled (see MessageOrderingOption) all messages for a key that experience failure will be rejected until the application calls this function.

Idempotency

This function never initiates a remote RPC, so there are no considerations around retrying it. Note, however, that more than one Publish() request may fail for the same ordering key. The application needs to call this function after each error before it can resume publishing messages with the same ordering key.

Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::StatusOr;
  [](pubsub::Publisher publisher) {
    struct SampleData {
      std::string ordering_key;
      std::string data;
    } data[] = {
        {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
        {"key1", "message4"}, {"key1", "message5"},
    };
    std::vector<future<void>> done;
    for (auto& datum : data) {
      auto const& da = datum;  // workaround MSVC lambda capture confusion
      auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
        auto const msg = da.ordering_key + "#" + da.data;
        auto id = f.get();
        if (!id) {
          std::cout << "An error has occurred publishing " << msg << "\n";
          publisher.ResumePublish(da.ordering_key);
          return;
        }
        std::cout << "Message " << msg << " published as id=" << *id << "\n";
      };
      done.push_back(
          publisher
              .Publish(pubsub::MessageBuilder{}
                           .SetData("Hello World! [" + datum.data + "]")
                           .SetOrderingKey(datum.ordering_key)
                           .Build())
              .then(handler));
    }
    publisher.Flush();
    // Block until all the messages are published (optional)
    for (auto& f : done) f.get();
  }
Parameter
NameDescription
ordering_key std::string
Returns
TypeDescription
void