Class Subscriber (2.29.0-rc)

Receive messages from the Cloud Pub/Sub service.

This class is used to receive message from a given subscription, with a fixed configuration such as credentials, and background threads. Applications that receive messages from multiple subscriptions need to create separate instances of this class. Applications wanting to receive events with configuration parameters also need to create separate instances.

See Also

https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub service.

Example: subscriber quickstart
  namespace pubsub = ::google::cloud::pubsub;
  auto sample = [](pubsub::Subscriber subscriber) {
    return subscriber.Subscribe(
        [&](pubsub::Message const& m, pubsub::AckHandler h) {
          std::cout << "Received message " << m << "\n";
          std::move(h).ack();
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
  };
Performance

Subscriber objects are relatively cheap to create, copy, and move. However, each Subscriber object must be created with a std::shared_ptr<SubscriberConnection>, which itself is relatively expensive to create. Therefore, connection instances should be shared when possible. See the MakeSubscriberConnection() function and the SubscriberConnection interface for more details.

Thread Safety

Instances of this class created via copy-construction or copy-assignment share the underlying pool of connections. Access to these copies via multiple threads is guaranteed to work. Two threads operating on the same instance of this class is not guaranteed to work.

Background Threads

This class uses the background threads configured via the Options from GrpcOptionList. Applications can create their own pool of background threads by (a) creating their own google::cloud::CompletionQueue, (b) passing this completion queue as a GrpcCompletionQueueOption, and (c) attaching any number of threads to the completion queue.

Example: using a custom thread pool
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::GrpcCompletionQueueOption;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  [](std::string project_id, std::string subscription_id) {
    // Create our own completion queue to run the background activity.
    google::cloud::CompletionQueue cq;
    // Setup one or more of threads to service this completion queue. These must
    // remain running until all the work is done.
    std::vector<std::thread> tasks;
    std::generate_n(std::back_inserter(tasks), 4, [&cq] {
      return std::thread([cq]() mutable { cq.Run(); });
    });

    auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
        pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
        Options{}.set<GrpcCompletionQueueOption>(cq)));

    // Because this is an example we want to exit eventually, use a mutex and
    // condition variable to notify the current thread and stop the example.
    std::mutex mu;
    std::condition_variable cv;
    int count = 0;
    auto await_count = [&] {
      std::unique_lock<std::mutex> lk(mu);
      cv.wait(lk, [&] { return count >= 4; });
    };
    auto increase_count = [&] {
      std::unique_lock<std::mutex> lk(mu);
      if (++count >= 4) cv.notify_one();
    };

    // Receive messages in the previously allocated thread pool.
    auto session = subscriber.Subscribe(
        [&](pubsub::Message const& m, pubsub::AckHandler h) {
          std::cout << "Received message " << m << "\n";
          increase_count();
          std::move(h).ack();
        });
    await_count();
    session.cancel();
    // Report any final status, blocking until the subscription loop completes,
    // either with a failure or because it was canceled.
    auto status = session.get();
    std::cout << "Message count=" << count << ", status=" << status << "\n";

    // Shutdown the completion queue and join the threads
    cq.Shutdown();
    for (auto& t : tasks) t.join();
  }
Asynchronous Functions

Some of the member functions in this class return a future<T> (or future<StatusOr<T>>) object. Readers are probably familiar with std::future<T>. Our version adds a .then() function to attach a callback to the future, which is invoked when the future is satisfied. This function returns a future<U> where U is the return value of the attached function. More details in the google::cloud::future documentation.

Error Handling

This class uses StatusOr<T> to report errors. When an operation fails to perform its work the returned StatusOr<T> contains the error details. If the ok() member function in the StatusOr<T> returns true then it contains the expected result. Please consult the google::cloud::StatusOr documentation for more details.

Changing Retry Parameters Example
  namespace pubsub = ::google::cloud::pubsub;
  using ::google::cloud::future;
  using ::google::cloud::Options;
  using ::google::cloud::StatusOr;
  auto sample = [](std::string project_id, std::string subscription_id) {
    // By default a subscriber will retry for 60 seconds, with an initial
    // backoff of 100ms, a maximum backoff of 60 seconds, and the backoff will
    // grow by 30% after each attempt. This changes those defaults.
    auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
        pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
        Options{}
            .set<pubsub::RetryPolicyOption>(
                pubsub::LimitedTimeRetryPolicy(
                    /*maximum_duration=*/std::chrono::minutes(1))
                    .clone())
            .set<pubsub::BackoffPolicyOption>(
                pubsub::ExponentialBackoffPolicy(
                    /*initial_delay=*/std::chrono::milliseconds(200),
                    /*maximum_delay=*/std::chrono::seconds(10),
                    /*scaling=*/2.0)
                    .clone())));

    auto session = subscriber.Subscribe(
        [](pubsub::Message const& m, pubsub::AckHandler h) {
          std::move(h).ack();
          std::cout << "Received message " << m << "\n";
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
    return std::make_pair(subscriber, std::move(session));
  };

Constructors

Subscriber(std::shared_ptr< SubscriberConnection >, Options)

Parameters
Name Description
connection std::shared_ptr< SubscriberConnection >
opts Options

Functions

Subscribe(ApplicationCallback, Options)

Creates a new session to receive messages from subscription.

Idempotency

This is an idempotent operation; it only reads messages from the service. Will make multiple attempts to start a connection to the service, subject to the retry policies configured in the SubscriberConnection. Once a successful connection is established the library will try to resume the connection even if the connection fails with a permanent error. Resuming the connection is subject to the retry policies as described earlier.

Note that calling AckHandler::ack() and/or AckHandler::nack() is handled differently with respect to retrying. Check the documentation of these functions for details.

Example
  namespace pubsub = ::google::cloud::pubsub;
  auto sample = [](pubsub::Subscriber subscriber) {
    return subscriber.Subscribe(
        [&](pubsub::Message const& m, pubsub::AckHandler h) {
          std::cout << "Received message " << m << "\n";
          std::move(h).ack();
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
  };
Parameters
Name Description
cb ApplicationCallback

the callable invoked when messages are received.

opts Options

any option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the PublisherConnection initialization.

Returns
Type Description
future< Status >

a future that is satisfied when the session will no longer receive messages. For example, because there was an unrecoverable error trying to receive data. Calling .cancel() in this object will (eventually) terminate the session and satisfy the future.

Subscribe(ExactlyOnceApplicationCallback, Options)

Creates a new session to receive messages from subscription using exactly-once delivery.

Idempotency

This is an idempotent operation; it only reads messages from the service. Will make multiple attempts to start a connection to the service, subject to the retry policies configured in the SubscriberConnection. Once a successful connection is established the library will try to resume the connection even if the connection fails with a permanent error. Resuming the connection is subject to the retry policies as described earlier.

Note that calling ExactlyOnceAckHandler::ack() and/or ExactlyOnceAckHandler::nack() have their own rules with respect to retrying. Check the documentation of these functions for details.

Example
  namespace pubsub = ::google::cloud::pubsub;
  auto sample = [](pubsub::Subscriber subscriber) {
    return subscriber.Subscribe(
        [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
          std::cout << "Received message " << m << "\n";
          std::move(h).ack().then([id = m.message_id()](auto f) {
            auto status = f.get();
            std::cout << "Message id " << id
                      << " ack() completed with status=" << status << "\n";
          });
          PleaseIgnoreThisSimplifiesTestingTheSamples();
        });
  };
Parameters
Name Description
cb ExactlyOnceApplicationCallback

the callable invoked when messages are received.

opts Options

any option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the PublisherConnection initialization.

Returns
Type Description
future< Status >

a future that is satisfied when the session will no longer receive messages. For example, because there was an unrecoverable error trying to receive data. Calling .cancel() in this object will (eventually) terminate the session and satisfy the future.

Pull(Options)

Pulls one message from subscription.

Idempotency

This is an idempotent operation; it only reads messages from the service. It will make multiple attempts to pull a message from the service, subject to the retry policies configured in the SubscriberConnection.

Note that calling PullAckHandler::ack() and/or PullAckHandler::nack() have their own rules with respect to retrying.

Example
  [](google::cloud::pubsub::Subscriber subscriber) {
    auto response = subscriber.Pull();
    if (!response) throw std::move(response).status();
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
  }
Parameter
Name Description
opts Options

any option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the PublisherConnection initialization.

Returns
Type Description
StatusOr< PullResponse >

a response including the message and a PullAckHandler to notify the library when the message has been successfully handled.