Cloud Pub/Sub API - Class Google::Cloud::PubSub::AsyncPublisher (v2.16.0)

Reference documentation and code samples for the Cloud Pub/Sub API class Google::Cloud::PubSub::AsyncPublisher.

Used to publish multiple messages in batches to a topic. See Topic#async_publisher

Inherits

  • Object

Includes

  • MonitorMixin

Example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop!

Methods

#callback_threads

def callback_threads() -> Numeric

The number of threads to handle the published messages' callbacks. Default is 4.

Returns
  • (Numeric) — the current value of callback_threads

#enable_message_ordering!

def enable_message_ordering!()

Enables message ordering for messages with ordering keys. When enabled, messages published with the same ordering_key will be delivered in the order they were published.

See #message_ordering?. See Topic#publish_async, Subscription#listen, and Message#ordering_key.

#flow_control

def flow_control()

Returns the value of attribute flow_control.

#flush

def flush() -> AsyncPublisher

Forces all messages in the current batch to be published immediately.

Returns

#interval

def interval() -> Numeric

The number of seconds to collect messages before the batch is published. Default is 0.01.

Returns
  • (Numeric) — the current value of interval

#max_bytes

def max_bytes() -> Integer

The maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB).

Returns
  • (Integer) — the current value of max_bytes

#max_messages

def max_messages() -> Integer

The maximum number of messages to be collected before the batch is published. Default is 100.

Returns
  • (Integer) — the current value of max_messages

#message_ordering?

def message_ordering?() -> Boolean

Whether message ordering for messages with ordering keys has been enabled. When enabled, messages published with the same ordering_key will be delivered in the order they were published. When disabled, messages may be delivered in any order.

See #enable_message_ordering!. See Topic#publish_async, Subscription#listen, and Message#ordering_key.

Returns
  • (Boolean)

#publish

def publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback) { |result| ... }

Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See Topic#publish_async

Parameters
  • data (String, File) — The message payload. This will be converted to bytes encoded as ASCII-8BIT.
  • attributes (Hash) — Optional attributes for the message.
  • ordering_key (String) (defaults to: nil) — Identifies related messages for which publish order should be respected.
Yields
  • (result) — the callback for when the message has been published
Yield Parameter
  • result (PublishResult) — the result of the asynchronous publish
Raises

#publish_threads

def publish_threads() -> Numeric

The number of threads used to publish messages. Default is 2.

Returns
  • (Numeric) — the current value of publish_threads

#resume_publish

def resume_publish(ordering_key) -> boolean

Resume publishing ordered messages for the provided ordering key.

Parameter
  • ordering_key (String) — Identifies related messages for which publish order should be respected.
Returns
  • (boolean) — true when resumed, false otherwise.

#started?

def started?() -> boolean

Whether the publisher has been started.

Returns
  • (boolean) — true when started, false otherwise.

#stop

def stop() -> AsyncPublisher

Begins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use #wait! to block until the publisher is fully stopped and all pending messages have been published.

Returns

#stop!

def stop!(timeout = nil) -> AsyncPublisher

Stop this publisher and block until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until timeout seconds have passed.

The same as calling #stop and #wait!.

Parameter
  • timeout (Number, nil) — The number of seconds to block until the publisher is fully stopped. Default will block indefinitely.
Returns

#stopped?

def stopped?() -> boolean

Whether the publisher has been stopped.

Returns
  • (boolean) — true when stopped, false otherwise.

#topic_name

def topic_name() -> String

The name of the topic the messages are published to. The value is a fully-qualified topic name in the form projects/{project_id}/topics/{topic_id}.

Returns
  • (String) — the current value of topic_name

#wait!

def wait!(timeout = nil) -> AsyncPublisher

Blocks until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until timeout seconds have passed.

Does not stop the publisher. To stop the publisher, first call #stop and then call #wait! to block until the publisher is stopped

Parameter
  • timeout (Number, nil) — The number of seconds to block until the publisher is fully stopped. Default will block indefinitely.
Returns

Constants

PUBLISH_RETRY_ERRORS

value: [ GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal, GRPC::ResourceExhausted, GRPC::Unauthenticated, GRPC::Unavailable ].freeze
rubocop:enable Metrics/AbcSize