Reference documentation and code samples for the Cloud Pub/Sub API class Google::Cloud::PubSub::AsyncPublisher.
Used to publish multiple messages in batches to a topic. See Publisher#async_publisher
Inherits
- Object
Includes
- MonitorMixin
Example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new publisher = pubsub.publisher "my-topic" publisher.publish_async "task completed" do |result| if result.succeeded? log_publish_success result.data else log_publish_failure result.data, result.error end end publisher.async_publisher.stop!
Methods
#callback_threads
def callback_threads() -> NumericThe number of threads to handle the published messages' callbacks. Default is 4.
- (Numeric) — the current value of callback_threads
#enable_message_ordering!
def enable_message_ordering!()Enables message ordering for messages with ordering keys. When
enabled, messages published with the same ordering_key will be
delivered in the order they were published.
See #message_ordering?. See Publisher#publish_async, Subscriber#listen, and Message#ordering_key.
#flow_control
def flow_control()Returns the value of attribute flow_control.
#flush
def flush() -> AsyncPublisherForces all messages in the current batch to be published immediately.
- (AsyncPublisher) — returns self so calls can be chained.
#interval
def interval() -> NumericThe number of seconds to collect messages before the batch is published. Default is 0.01.
- (Numeric) — the current value of interval
#max_bytes
def max_bytes() -> IntegerThe maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB).
- (Integer) — the current value of max_bytes
#max_messages
def max_messages() -> IntegerThe maximum number of messages to be collected before the batch is published. Default is 100.
- (Integer) — the current value of max_messages
#message_ordering?
def message_ordering?() -> BooleanWhether message ordering for messages with ordering keys has been
enabled. When enabled, messages published with the same ordering_key
will be delivered in the order they were published. When disabled,
messages may be delivered in any order.
See #enable_message_ordering!. See Publisher#publish_async, Subscriber#listen, and Message#ordering_key.
- (Boolean)
#publish
def publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback) { |result| ... }Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See Publisher#publish_async
- data (String, File) — The message payload. This will be converted to bytes encoded as ASCII-8BIT.
- attributes (Hash) — Optional attributes for the message.
- ordering_key (String) (defaults to: nil) — Identifies related messages for which publish order should be respected.
- (result) — the callback for when the message has been published
- result (PublishResult) — the result of the asynchronous publish
- (Google::Cloud::PubSub::AsyncPublisherStopped) — when the publisher is stopped. (See #stop and #stopped?.)
-
(Google::Cloud::PubSub::OrderedMessagesDisabled) — when
publishing a message with an
ordering_keybut ordered messages are not enabled. (See #message_ordering? and #enable_message_ordering!.) -
(Google::Cloud::PubSub::OrderingKeyError) — when publishing a
message with an
ordering_keythat has already failed when publishing. Use #resume_publish to allow thisordering_keyto be published again.
#publish_threads
def publish_threads() -> NumericThe number of threads used to publish messages. Default is 2.
- (Numeric) — the current value of publish_threads
#resume_publish
def resume_publish(ordering_key) -> booleanResume publishing ordered messages for the provided ordering key.
- ordering_key (String) — Identifies related messages for which publish order should be respected.
-
(boolean) —
truewhen resumed,falseotherwise.
#started?
def started?() -> booleanWhether the publisher has been started.
-
(boolean) —
truewhen started,falseotherwise.
#stop
def stop() -> AsyncPublisherBegins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use #wait! to block until the publisher is fully stopped and all pending messages have been published.
- (AsyncPublisher) — returns self so calls can be chained.
#stop!
def stop!(timeout = nil) -> AsyncPublisherStop this publisher and block until the publisher is fully stopped,
all pending messages have been published, and all callbacks have
completed, or until timeout seconds have passed.
- timeout (Number, nil) — The number of seconds to block until the publisher is fully stopped. Default will block indefinitely.
- (AsyncPublisher) — returns self so calls can be chained.
#stopped?
def stopped?() -> booleanWhether the publisher has been stopped.
-
(boolean) —
truewhen stopped,falseotherwise.
#topic_name
def topic_name() -> StringThe name of the topic the messages are published to. The value is a
fully-qualified topic name in the form projects/{project_id}/topics/{topic_id}.
- (String) — the current value of topic_name
#wait!
def wait!(timeout = nil) -> AsyncPublisherBlocks until the publisher is fully stopped, all pending messages have
been published, and all callbacks have completed, or until timeout
seconds have passed.
Does not stop the publisher. To stop the publisher, first call #stop and then call #wait! to block until the publisher is stopped
- timeout (Number, nil) — The number of seconds to block until the publisher is fully stopped. Default will block indefinitely.
- (AsyncPublisher) — returns self so calls can be chained.
Constants
PUBLISH_RETRY_ERRORS
value: [
GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal,
GRPC::ResourceExhausted, GRPC::Unauthenticated, GRPC::Unavailable
].freeze
rubocop:enable Metrics/AbcSize