Reference documentation and code samples for the Cloud Pub/Sub API class Google::Cloud::PubSub::Subscriber.
Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::PubSub::Subscription#listen
Inherits
- Object
Includes
- MonitorMixin
Example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Start background threads that will call the block passed to listen. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
Methods
#callback
def callback() -> Proc
The procedure that will handle the messages received from the subscription.
- (Proc) — the current value of callback
#callback_threads
def callback_threads() -> Integer
The number of threads used to handle the received messages. Default is 8.
- (Integer) — the current value of callback_threads
#deadline
def deadline() -> Numeric
The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
- (Numeric) — the current value of deadline
#inventory
def inventory() -> Integer
The number of received messages to be collected by subscriber. Default is 1,000.
- (Integer) — The maximum number of messages.
#inventory_bytesize
def inventory_bytesize() -> Integer
The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).
- (Integer) — The maximum number of bytes.
#inventory_extension
def inventory_extension() -> Integer
The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
- (Integer) — The maximum number of seconds.
#inventory_limit
def inventory_limit() -> Integer
The number of received messages to be collected by subscriber. Default is 1,000.
- (Integer) — The maximum number of messages.
#last_error
def last_error() -> Exception, nil
The most recent unhandled error to occur while listening to messages on the subscriber.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
- (Exception, nil) — error The most recent error raised.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Start listening for messages and errors. subscriber.start # If an error was raised, it can be retrieved here: subscriber.last_error #=> nil # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
#max_duration_per_lease_extension
def max_duration_per_lease_extension() -> Integer
The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
- (Integer) — The maximum number of seconds.
#max_outstanding_bytes
def max_outstanding_bytes() -> Integer
The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).
- (Integer) — The maximum number of bytes.
#max_outstanding_messages
def max_outstanding_messages() -> Integer
The number of received messages to be collected by subscriber. Default is 1,000.
- (Integer) — The maximum number of messages.
#max_total_lease_duration
def max_total_lease_duration() -> Integer
The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
- (Integer) — The maximum number of seconds.
#message_ordering
def message_ordering() -> Boolean
Whether message ordering has been enabled.
- (Boolean) — the current value of message_ordering
#on_error
def on_error(&block) { |error| ... }
Register to be notified of errors when raised.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
Multiple error handlers can be added.
- (callback) — The block to be called when an error is raised.
- error (Exception) — The error raised.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Register to be notified when unhandled errors occur. subscriber.on_error do |error| # log error puts error end # Start listening for messages and errors. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
#push_threads
def push_threads() -> Integer
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.
- (Integer) — the current value of push_threads
#start
def start() -> Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
- (Subscriber) — returns self so calls can be chained.
#started?
def started?() -> boolean
Whether the subscriber has been started.
-
(boolean) —
true
when started,false
otherwise.
#stop
def stop() -> Subscriber
Immediately stops the subscriber. No new messages will be pulled from the subscription. Use #wait! to block until all received messages have been processed or released: All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered.
- (Subscriber) — returns self so calls can be chained.
#stop!
def stop!(timeout = nil) -> Subscriber
Stop this subscriber and block until the subscriber is fully stopped
and all received messages have been processed or released, or until
timeout
seconds have passed.
- timeout (Number, nil) — The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.
- (Subscriber) — returns self so calls can be chained.
#stopped?
def stopped?() -> boolean
Whether the subscriber has been stopped.
-
(boolean) —
true
when stopped,false
otherwise.
#streams
def streams() -> Integer
The number of concurrent streams to open to pull messages from the subscription. Default is 4.
- (Integer) — the current value of streams
#subscription_name
def subscription_name() -> String
The name of the subscription the messages are pulled from.
- (String) — the current value of subscription_name
#use_legacy_flow_control?
def use_legacy_flow_control?() -> Boolean
Whether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see https://cloud.google.com/pubsub/docs/pull#config.
server side flow control are enforced.
-
(Boolean) —
true
when only client side flow control is enforced,false
when both client and
#wait!
def wait!(timeout = nil) -> Subscriber
Blocks until the subscriber is fully stopped and all received messages
have been processed or released, or until timeout
seconds have
passed.
Does not stop the subscriber. To stop the subscriber, first call #stop and then call #wait! to block until the subscriber is stopped.
- timeout (Number, nil) — The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.
- (Subscriber) — returns self so calls can be chained.