Reference documentation and code samples for the Cloud Pub/Sub API class Google::Cloud::PubSub::MessageListener.
MessageListener object used to stream and process messages from a Subscriber. See Subscriber#listen
Inherits
- Object
Includes
- MonitorMixin
Example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new subscriber = pubsub.subscriber "my-topic-sub" listener = subscriber.listen do |received_message| # process message received_message.acknowledge! end # Start background threads that will call the block passed to listen. listener.start # Shut down the subscriber when ready to stop receiving messages. listener.stop!
Methods
#callback
def callback() -> Proc
The procedure that will handle the messages received from the subscription.
- (Proc) — the current value of callback
#callback_threads
def callback_threads() -> Integer
The number of threads used to handle the received messages. Default is 8.
- (Integer) — the current value of callback_threads
#deadline
def deadline() -> Numeric
The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
- (Numeric) — the current value of deadline
#last_error
def last_error() -> Exception, nil
The most recent unhandled error to occur while listening to messages on the listener.
If an unhandled error has occurred the listener will attempt to recover from the error and resume listening.
- (Exception, nil) — error The most recent error raised.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new subscriber = pubsub.subscriber "my-topic-sub" listener = subscriber.listen do |received_message| # process message received_message.acknowledge! end # Start listening for messages and errors. listener.start # If an error was raised, it can be retrieved here: listener.last_error #=> nil # Shut down the subscriber when ready to stop receiving messages. listener.stop!
#max_duration_per_lease_extension
def max_duration_per_lease_extension() -> Integer
The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the listener fails to extend the deadline. Default is 0 (disabled).
- (Integer) — The maximum number of seconds.
#max_outstanding_bytes
def max_outstanding_bytes() -> Integer
The total byte size of received messages to be collected by listener. Default is 100,000,000 (100MB).
- (Integer) — The maximum number of bytes.
#max_outstanding_messages
def max_outstanding_messages() -> Integer
The number of received messages to be collected by listener. Default is 1,000.
- (Integer) — The maximum number of messages.
#max_total_lease_duration
def max_total_lease_duration() -> Integer
The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
- (Integer) — The maximum number of seconds.
#message_ordering
def message_ordering() -> Boolean
Whether message ordering has been enabled.
- (Boolean) — the current value of message_ordering
#min_duration_per_lease_extension
def min_duration_per_lease_extension() -> Integer
The minimum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the listener fails to extend the deadline. Default is 0 (disabled).
- (Integer) — The minimum number of seconds.
#on_error
def on_error(&block) { |error| ... }
Register to be notified of errors when raised.
If an unhandled error has occurred the listener will attempt to recover from the error and resume listening.
Multiple error handlers can be added.
- (callback) — The block to be called when an error is raised.
- error (Exception) — The error raised.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new subscriber = pubsub.subscriber "my-topic-sub" listener = subscriber.listen do |received_message| # process message received_message.acknowledge! end # Register to be notified when unhandled errors occur. listener.on_error do |error| # log error puts error end # Start listening for messages and errors. listener.start # Shut down the subscriber when ready to stop receiving messages. listener.stop!
#push_threads
def push_threads() -> Integer
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.
- (Integer) — the current value of push_threads
#start
def start() -> MessageListener
Starts the listener pulling from the subscription and processing the received messages.
- (MessageListener) — returns self so calls can be chained.
#started?
def started?() -> boolean
Whether the listener has been started.
-
(boolean) —
true
when started,false
otherwise.
#stop
def stop() -> MessageListener
Immediately stops the listener. No new messages will be pulled from the subscription. Use #wait! to block until all received messages have been processed or released: All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered.
- (MessageListener) — returns self so calls can be chained.
#stop!
def stop!(timeout = nil) -> MessageListener
Stop this listener and block until the listener is fully stopped
and all received messages have been processed or released, or until
timeout
seconds have passed.
- timeout (Number, nil) — The number of seconds to block until the listener is fully stopped. Default will block indefinitely.
- (MessageListener) — returns self so calls can be chained.
#stopped?
def stopped?() -> boolean
Whether the listener has been stopped.
-
(boolean) —
true
when stopped,false
otherwise.
#streams
def streams() -> Integer
The number of concurrent streams to open to pull messages from the subscription. Default is 1.
- (Integer) — the current value of streams
#subscription_name
def subscription_name() -> String
The name of the subscription the messages are pulled from.
- (String) — the current value of subscription_name
#wait!
def wait!(timeout = nil) -> MessageListener
Blocks until the listener is fully stopped and all received messages
have been processed or released, or until timeout
seconds have
passed.
Does not stop the listener. To stop the listener, first call #stop and then call #wait! to block until the listener is stopped.
- timeout (Number, nil) — The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.
- (MessageListener) — returns self so calls can be chained.