Cloud Pub/Sub API - Class Google::Cloud::PubSub::Subscriber (v2.9.2)

Reference documentation and code samples for the Cloud Pub/Sub API class Google::Cloud::PubSub::Subscriber.

Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::PubSub::Subscription#listen

Inherits

  • Object

Includes

  • MonitorMixin

Example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Methods

#callback

def callback() -> Proc

The procedure that will handle the messages received from the subscription.

Returns
  • (Proc) — the current value of callback

#callback_threads

def callback_threads() -> Integer

The number of threads used to handle the received messages. Default is 8.

Returns
  • (Integer) — the current value of callback_threads

#deadline

def deadline() -> Numeric

The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.

Returns
  • (Numeric) — the current value of deadline

#inventory

def inventory() -> Integer

The number of received messages to be collected by subscriber. Default is 1,000.

Returns
  • (Integer) — The maximum number of messages.

#inventory_bytesize

def inventory_bytesize() -> Integer

The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).

Returns
  • (Integer) — The maximum number of bytes.

#inventory_extension

def inventory_extension() -> Integer

The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).

Returns
  • (Integer) — The maximum number of seconds.

#inventory_limit

def inventory_limit() -> Integer

The number of received messages to be collected by subscriber. Default is 1,000.

Returns
  • (Integer) — The maximum number of messages.

#last_error

def last_error() -> Exception, nil

The most recent unhandled error to occur while listening to messages on the subscriber.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Returns
  • (Exception, nil) — error The most recent error raised.
Example
require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start listening for messages and errors.
subscriber.start

# If an error was raised, it can be retrieved here:
subscriber.last_error #=> nil

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

#max_duration_per_lease_extension

def max_duration_per_lease_extension() -> Integer

The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).

Returns
  • (Integer) — The maximum number of seconds.

#max_outstanding_bytes

def max_outstanding_bytes() -> Integer

The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).

Returns
  • (Integer) — The maximum number of bytes.

#max_outstanding_messages

def max_outstanding_messages() -> Integer

The number of received messages to be collected by subscriber. Default is 1,000.

Returns
  • (Integer) — The maximum number of messages.

#max_total_lease_duration

def max_total_lease_duration() -> Integer

The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).

Returns
  • (Integer) — The maximum number of seconds.

#message_ordering

def message_ordering() -> Boolean

Whether message ordering has been enabled.

Returns
  • (Boolean) — the current value of message_ordering

#on_error

def on_error(&block) { |error| ... }

Register to be notified of errors when raised.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Multiple error handlers can be added.

Yields
  • (callback) — The block to be called when an error is raised.
Yield Parameter
  • error (Exception) — The error raised.
Example
require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Register to be notified when unhandled errors occur.
subscriber.on_error do |error|
  # log error
  puts error
end

# Start listening for messages and errors.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

#push_threads

def push_threads() -> Integer

The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.

Returns
  • (Integer) — the current value of push_threads

#start

def start() -> Subscriber

Starts the subscriber pulling from the subscription and processing the received messages.

Returns
  • (Subscriber) — returns self so calls can be chained.

#started?

def started?() -> boolean

Whether the subscriber has been started.

Returns
  • (boolean) — true when started, false otherwise.

#stop

def stop() -> Subscriber

Immediately stops the subscriber. No new messages will be pulled from the subscription. Use #wait! to block until all received messages have been processed or released: All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered.

Returns
  • (Subscriber) — returns self so calls can be chained.

#stop!

def stop!(timeout = nil) -> Subscriber

Stop this subscriber and block until the subscriber is fully stopped and all received messages have been processed or released, or until timeout seconds have passed.

The same as calling #stop and #wait!.

Parameter
  • timeout (Number, nil) — The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.
Returns
  • (Subscriber) — returns self so calls can be chained.

#stopped?

def stopped?() -> boolean

Whether the subscriber has been stopped.

Returns
  • (boolean) — true when stopped, false otherwise.

#streams

def streams() -> Integer

The number of concurrent streams to open to pull messages from the subscription. Default is 4.

Returns
  • (Integer) — the current value of streams

#subscription_name

def subscription_name() -> String

The name of the subscription the messages are pulled from.

Returns
  • (String) — the current value of subscription_name

#use_legacy_flow_control?

def use_legacy_flow_control?() -> Boolean

Whether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see https://cloud.google.com/pubsub/docs/pull#config.

server side flow control are enforced.

Returns
  • (Boolean) — true when only client side flow control is enforced, false when both client and

#wait!

def wait!(timeout = nil) -> Subscriber

Blocks until the subscriber is fully stopped and all received messages have been processed or released, or until timeout seconds have passed.

Does not stop the subscriber. To stop the subscriber, first call #stop and then call #wait! to block until the subscriber is stopped.

Parameter
  • timeout (Number, nil) — The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.
Returns
  • (Subscriber) — returns self so calls can be chained.