Nachrichten mit Pull empfangen

Pub/Sub unterstützt die Push- und Pull-Nachrichtenzustellung. Einen Überblick und Vergleich von Pull- und Push-Abos finden Sie in Abonnentenübersicht. In diesem Dokument wird die Pull-Zustellung beschrieben. Der Push-Aboleitfaden enthält eine Beschreibung der Push-Zustellung.

Asynchrone Pull-Zustellung

Mit der asynchronen Pull-Zustellung können Sie den Durchsatz Ihrer Anwendung erhöhen, da sie nicht für neue Nachrichten blockiert werden muss. Sie haben die Möglichkeit, Nachrichten in der Anwendung mit einem lange laufenden Nachrichten-Listener zu empfangen und wie im Beispiel unten nacheinander zu bestätigen. Java-, Python-, .NET-, Go- und Ruby-Clients verwenden die streamingPull Service API, um die asynchrone Client API effizient zu implementieren.

Die asynchrone Pull-Zustellung von Nachrichten wird nicht von allen Clientbibliotheken unterstützt. Weitere Informationen zur synchronen Pull-Zustellung von Nachrichten finden Sie im Abschnitt Synchrone Pull-Zustellung.

Weitere Informationen finden Sie in der API-Referenzdokumentation zu Ihrer Programmiersprache.

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncSample
{
    public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = Encoding.UTF8.GetString(message.Data.ToArray());
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"sync"

	"cloud.google.com/go/pubsub"
)

func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// Consume 10 messages.
	var mu sync.Mutex
	received := 0
	sub := client.Subscription(subID)
	cctx, cancel := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
		received++
		if received == 10 {
			cancel()
		}
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages() {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionName);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

listenForMessages();

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Benutzerdefinierte Attribute verarbeiten

In diesem Beispiel wird gezeigt, wie Nachrichten asynchron abgerufen und die benutzerdefinierten Attribute aus Metadaten aufgerufen werden.

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithCustomAttributesExample(projectId, subscriptionId);
  }

  public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          // Print message attributes.
          message
              .getAttributesMap()
              .forEach((key, value) -> System.out.println(key + " = " + value));
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message.data))
    if message.attributes:
        print("Attributes:")
        for key in message.attributes:
            value = message.attributes.get(key)
            print("{}: {}".format(key, value))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Prüfen, ob Fehler aufgetreten sind

In diesem Beispiel wird dargestellt, wie Fehler behandelt werden, die beim Abonnieren von Nachrichten auftreten können.

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithErrorListenerExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithErrorListenerExample(projectId, subscriptionId);
  }

  public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setExecutorProvider(executorProvider)
              .build();

      // Listen for unrecoverable failures. Rebuild a subscriber and restart subscribing
      // when the current subscriber encounters permanent errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (!executorProvider.getExecutor().isShutdown()) {
                subscribeWithErrorListenerExample(projectId, subscriptionId);
              }
            }
          },
          MoreExecutors.directExecutor());

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForErrors() {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionName);

  // Create an event handler to handle messages
  const messageHandler = function (message) {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Create an event handler to handle errors
  const errorHandler = function (error) {
    // Do something with the error
    console.error(`ERROR: ${error}`);
    throw error;
  };

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  subscription.on('error', errorHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    subscription.removeListener('error', errorHandler);
  }, timeout * 1000);
}

listenForErrors();

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    try:
        streaming_pull_future.result(timeout=timeout)
    except Exception as e:
        streaming_pull_future.cancel()
        print(
            "Listening for messages on {} threw an exception: {}.".format(
                subscription_id, e
            )
        )

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."
end

Steuerung des Nachrichtenflusses

Ihr Abonnentenclient verarbeitet und bestätigt Nachrichten möglicherweise langsamer, als Pub/Sub diese an den Client sendet. In diesem Fall:

  • Es ist möglich, dass an einem Client ein Nachrichtenrückstand auftritt, weil er nicht die Kapazität hat, das Volumen der eingehenden Nachrichten zu verarbeiten. Ein anderer Client im Netzwerk hat aber diese Kapazität. Der zweite Client könnte den Rückstand reduzieren, hat aber keine Möglichkeit dazu, da der erste Client ein Vorrecht auf die Verarbeitung der empfangenen Nachrichten hat. Dies reduziert die Verarbeitungsgeschwindigkeit insgesamt, weil die Nachrichten auf dem ersten Client hängen bleiben.

  • Da die Clientbibliothek die Bestätigungsfrist für zurückgestellte Nachrichten wiederholt verlängert, verbrauchen diese Nachrichten weiterhin Speicher-, CPU- und Bandbreitenressourcen. Dadurch können dem Abonnentenclient die Ressourcen (z. B. Speicher) ausgehen, was den Durchsatz und die Latenz der Nachrichtenverarbeitung beeinträchtigen kann.

Wenn Sie diese Probleme minimieren möchten, verwenden Sie die Ablaufsteuerungsfunktionen des Abonnenten. Damit können Sie die Geschwindigkeit steuern, mit der der Abonnent Nachrichten empfängt. Diese Ablaufsteuerungsfunktionen werden in den folgenden Beispielen veranschaulicht:

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub C# API.


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithFlowControlAsyncSample
{
    public async Task<int> PullMessagesWithFlowControlAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName,
            settings: new SubscriberClient.Settings()
            {
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
            });
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = Encoding.UTF8.GetString(message.Data.ToArray());
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSettings(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	sub := client.Subscription(subID)
	sub.ReceiveSettings.Synchronous = true
	// MaxOutstandingMessages is the maximum number of unprocessed messages the
	// client will pull from the server before pausing.
	//
	// This is only guaranteed when ReceiveSettings.Synchronous is set to true.
	// When Synchronous is set to false, the StreamingPull RPC is used which
	// can pull a single large batch of messages at once that is greater than
	// MaxOustandingMessages before pausing. For more info, see
	// https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
	sub.ReceiveSettings.MaxOutstandingMessages = 10
	// MaxOutstandingBytes is the maximum size of unprocessed messages,
	// that the client will pull from the server before pausing. Similar
	// to MaxOutstandingMessages, this may be exceeded with a large batch
	// of messages since we cannot control the size of a batch of messages
	// from the server (even with the synchronous Pull RPC).
	sub.ReceiveSettings.MaxOutstandingBytes = 1e10
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithFlowControlSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithFlowControlSettingsExample(projectId, subscriptionId);
  }

  public static void subscribeWithFlowControlSettingsExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;

    // The subscriber will pause the message stream and stop receiving more messsages from the
    // server if any one of the conditions is met.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.newBuilder()
            // 1,000 outstanding messages. Must be >0. It controls the maximum number of messages
            // the subscriber receives before pausing the message stream.
            .setMaxOutstandingElementCount(1000L)
            // 100 MiB. Must be >0. It controls the maximum size of messages the subscriber
            // receives before pausing the message stream.
            .setMaxOutstandingRequestBytes(100L * 1024L * 1024L)
            .build();

    try {
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setFlowControlSettings(flowControlSettings)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const maxInProgress = 5;
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function subscribeWithFlowControlSettings() {
  const subscriberOptions = {
    flowControl: {
      maxMessages: maxInProgress,
    },
  };

  // References an existing subscription.
  // Note that flow control settings are not persistent across subscribers.
  const subscription = pubSubClient.subscription(
    subscriptionName,
    subscriberOptions
  );

  console.log(
    `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
  );

  const messageHandler = message => {
    console.log(`Received message: ${message.id}`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.close();
  }, timeout * 1000);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control
)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Generell zeigt die Notwendigkeit der Ablaufsteuerung, dass Nachrichten mit einer höheren Geschwindigkeit veröffentlicht werden, als sie verarbeitet werden können. Wenn dies ein Dauerzustand ist und kein vorübergehender Anstieg des Nachrichtenvolumens, sollten Sie die Anzahl der Abonnentenclient-Instanzen erhöhen.

Gleichzeitigkeitserkennung

Die Unterstützung für Gleichzeitigkeit ist programmiersprachenabhängig. Bei Sprachimplementierungen, die parallele Threads wie Java und Go unterstützen, legen die Clientbibliotheken die Anzahl der Threads jeweils anhand ihrer Standardeinstellung fest. Diese Einstellung ist für Ihre Anwendung möglicherweise nicht optimal. Wenn Sie beispielsweise feststellen, dass Ihre Abonnentenanwendung mit dem eingehenden Nachrichtenvolumen nicht Schritt hält, aber nicht CPU-gebunden ist, sollten Sie die Anzahl der Threads erhöhen. Bei CPU-intensiver Nachrichtenverarbeitung kann es sinnvoll sein, die Anzahl der Threads zu verringern.

Das folgende Beispiel zeigt, wie die Gleichzeitigkeit in einem Abo gesteuert wird.

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"runtime"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrenyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency settings. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines is the number of goroutines sub.Receive will spawn to pull messages concurrently.
	sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()

	// Receive messages for 10 seconds.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	defer close(cm)
	// Handle individual messages in a goroutine.
	go func() {
		for msg := range cm {
			fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
			msg.Ack()
		}
	}()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}

	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Die Unterstützung für Gleichzeitigkeit ist programmiersprachenabhängig. Weitere Informationen finden Sie in der API-Referenzdokumentation.

StreamingPull

Der Pub/Sub-Dienst hat zwei APIs zum Abrufen von Nachrichten:

Wo möglich, verwenden die Cloud-Client-Bibliotheken StreamingPull für maximalen Durchsatz und niedrigste Latenz. Auch wenn Sie die StreamingPull API vielleicht niemals direkt verwenden, ist es dennoch hilfreich, wenn Sie einige wesentliche Attribute von StreamingPull und den Unterschied zur herkömmlichen Pull-Methode kennen.

Die Pull-Methode basiert auf einem Anfrage-/Antwort-Modell:

  1. Der Client sendet eine Anfrage an den Server.
  2. Der Server antwortet mit null oder mehr Nachrichten und trennt die Verbindung.

Die API des StreamingPull-Dienstes benötigt eine bidirektionale Verbindung, um mehrere Nachrichten zu empfangen, sobald diese verfügbar werden.

  1. Der Client sendet eine Anfrage an den Server, um eine Verbindung herzustellen.
  2. Der Server sendet kontinuierlich Nachrichten an den verbundenen Client.
  3. Die Verbindung wird schließlich entweder vom Client oder vom Server beendet.

Sie stellen dem Abonnenten einen Callback bereit und der Abonnent führt den Callback für jede Nachricht asynchron aus. Wenn ein Abonnent Nachrichten mit demselben Reihenfolgenschlüssel empfängt, führen die Clientbibliotheken den Callback nacheinander aus. Der Pub/Sub-Dienst sendet diese Nachrichten an den gleichen Abonnenten auf Best-Effort-Basis.

StreamingPull hat eine (erwartete) Fehlerrate von 100 %

StreamingPull-Streams werden immer mit einem Fehlerstatus beendet. Beachten Sie, dass der Fehler hier anders als bei regulären RPCs lediglich ein Hinweis darauf ist, dass ein Stream unterbrochen wurde – nicht jedoch, dass eine Anfrage fehlgeschlagen ist. Aus diesem Grund ist es möglich, dass die StreamingPull API eine anscheinend überraschende Fehlerrate von 100 % hat. Dies ist designbedingt.

Diagnose von StreamingPull-Fehlern

Da StreamingPull-Streams immer mit einem Fehler enden, ist es nicht hilfreich, die Stream-Beendigungskennzahlen während der Fehlerdiagnose zu untersuchen. Konzentrieren Sie sich stattdessen auf den StreamPull-Messwert für die Nachrichtenoperation subscription/streaming_pull_message_operation_count. Suchen Sie nach folgenden Fehlern:

  • FAILED_PRECONDITION-Fehler können in folgenden Fällen auftreten:
    • Pub/Sub versucht, eine Nachricht mit einem deaktivierten Cloud KMS-Schlüssel zu entschlüsseln.
    • Abos können vorübergehend gesperrt werden, wenn noch nicht versendete Nachrichten mit einem deaktivierten Cloud KMS-Schlüssel verschlüsselt sind.
  • UNAVAILABLE Fehler

StreamingPull: Große Rückstände kleiner Nachrichten verwalten

Das gRPC-StreamingPull-Paket ist für einen hohen Durchsatz optimiert, sodass Nachrichten zwischengespeichert werden. Dies kann Auswirkungen haben, wenn Sie versuchen, statt eines kontinuierlichen Streams neuer Nachrichten große Rückstände von kleinen Nachrichten zu verarbeiten. Unter diesen Bedingungen werden Nachrichten möglicherweise mehrfach zugestellt und nicht per Lastenausgleich auf Clients verteilt und so effektiver verarbeitet.

Der Zwischenspeicher für den Pub/Sub-Dienst und den Speicherplatz des Clientbibliothek-Nutzers beträgt ca. 10 MB. Die Auswirkungen dieses Zwischenspeichers auf das Verhalten der Clientbibliothek werden am folgenden Beispiel deutlich:

  • Es gibt einen Verarbeitungsrückstand von 10.000 1-KB-Nachrichten in einem Abo.
  • Jede Nachricht benötigt 1 Sekunde für die sequenzielle Verarbeitung durch eine Clientinstanz mit einem einzigen Thread.
  • Die erste Clientinstanz, die eine StreamingPull-Verbindung zu dem Dienst für dieses Abo herstellt, füllt den Puffer mit allen 10.000 Nachrichten.
  • Es dauert 10.000 Sekunden (fast drei Stunden), bis der Zwischenspeicher verarbeitet wurde.
  • In dieser Zeit überschreiten einige der gepufferten Nachrichten ihre Bestätigungsfrist und werden noch einmal an denselben Client gesendet, was zu Duplikaten führt.
  • Wenn mehrere Clientinstanzen ausgeführt werden, sind die Nachrichten, die sich im Puffer des einen Clients befinden, für keine Clientinstanzen verfügbar.

Diese Situation tritt nicht ein, wenn die Nachrichten mit einer konstanten Rate (und nicht als einzelner, großer Batch) ankommen: Der Dienst verfügt nie über die gesamten 10 MB an Nachrichten gleichzeitig und kann daher ein effektives Load-Balancing für mehrere Abonnenten durchführen.

Verwenden Sie zur Behebung dieses Problems entweder ein Push-Abo oder die Pull-API, die derzeit in einigen der Cloud-Clientbibliotheken (siehe Abschnitt "Synchroner Pull") und in allen API-Clientbibliotheken verfügbar ist. Weitere Informationen finden Sie in der Dokumentation zu Clientbibliotheken.

Synchrone Pull-Zustellung

In bestimmten Fällen ist die asynchrone Pull-Zustellung für Ihre Anwendung eher ungeeignet. Zum Beispiel dann, wenn die Anwendungslogik Nachrichten mithilfe eines Abfragemusters abruft oder wenn sie die Anzahl der zu einem bestimmten Zeitpunkt vom Client abgerufenen Nachrichten präzise begrenzen muss. Zur Unterstützung solcher Anwendungen unterstützt der Dienst eine synchrone Pull-Methode.

Hier sehen Sie Beispielcode für die Pull-Zustellung und die Bestätigung einer festen Anzahl von Nachrichten:

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Linq;
using System.Text;
using System.Threading;

public class PullMessagesSyncSample
{
    public int PullMessagesSync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
        int messageCount = 0;
        try
        {
            // Pull messages from server,
            // allowing an immediate response if there are no messages.
            PullResponse response = subscriberClient.Pull(subscriptionName, returnImmediately: false, maxMessages: 20);
            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
                Interlocked.Increment(ref messageCount);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && messageCount > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return messageCount;
    }
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncExample(
      String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // Handle received message
        // ...
        ackIds.add(message.getAckId());
      }
      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPull() {
  const formattedSubscription = subClient.subscriptionPath(
    projectId,
    subscriptionName
  );

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,
  };

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages) {
    console.log(`Received message: ${message.message.data}`);
    ackIds.push(message.ackId);
  }

  // Acknowledge all of the messages. You could also ackknowledge
  // these individually, but this is more efficient.
  const ackRequest = {
    subscription: formattedSubscription,
    ackIds: ackIds,
  };
  await subClient.acknowledge(ackRequest);

  console.log('Done.');
}

synchronousPull().catch(console.error);

PHP

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Protokoll

Anfrage:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Antwort:

200 OK

{
  "receivedMessages": [{
    "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"
    }
  }]
}

Anfrage:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 3

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    # The subscriber pulls a specific number of messages.
    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
    )

    ack_ids = []
    for received_message in response.received_messages:
        print("Received: {}".format(received_message.message.data))
        ack_ids.append(received_message.ack_id)

    # Acknowledges the received messages so they will not be sent again.
    subscriber.acknowledge(
        request={"subscription": subscription_path, "ack_ids": ack_ids}
    )

    print(
        "Received and acknowledged {} messages. Done.".format(
            len(response.received_messages)
        )
    )

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

Pub/Sub stellt eine Liste von Nachrichten bereit. Wenn die Liste mehrere Nachrichten enthält, sortiert Pub/Sub die Nachrichten mit demselben Reihenfolgenschlüssel.

Voraussetzung für die Effektivität der synchronen Pull-Zustellung ist, dass eine hohe Zahl gleichzeitig ausstehender Pull-Anfragen vorliegt. Je höher der Durchsatz des Themas wird, desto mehr Pull-Anfragen werden benötigt. Im Allgemeinen sind für latenzempfindliche Anwendungen asynchrone Pull-Zustellungen zu bevorzugen.

Synchroner Pull mit Lease-Management

Die Verarbeitung einer einzelnen Nachricht kann die vorkonfigurierte Bestätigungsfrist überschreiten, die auch als Lease bezeichnet wird. Um eine erneute Zustellung dieser Nachrichten zu vermeiden, bieten die Clientbibliotheken die Möglichkeit, ihre Bestätigungsfristen zurückzusetzen (mit Ausnahme der Go-Clientbibliothek, die die Bestätigungsfristen für abgefragte Nachrichten automatisch ändert), wie in den folgenden Beispielen gezeigt:

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"time"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSync(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
	// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
	// the max number of messages the client will hold in memory at a time.
	sub.ReceiveSettings.Synchronous = true
	sub.ReceiveSettings.MaxOutstandingMessages = 10

	// Receive messages for 5 seconds.
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	defer close(cm)
	// Handle individual messages in a goroutine.
	go func() {
		for msg := range cm {
			fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
			msg.Ack()
		}
	}()

	// Receive blocks until the passed in context is done.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	})
	if err != nil && status.Code(err) != codes.Canceled {
		return fmt.Errorf("Receive: %v", err)
	}

	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncWithLeaseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    projectId = "tz-playground-bigdata";
    subscriptionId = "uno";

    subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncWithLeaseExample(
      String projectId, String subscriptionId, Integer numOfMessages)
      throws IOException, InterruptedException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20 MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {

      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);

      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      List<String> ackIds = new ArrayList<>();

      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        ackIds.add(message.getAckId());

        // Modify the ack deadline of each received message from the default 10 seconds to 30.
        // This prevents the server from redelivering the message after the default 10 seconds
        // have passed.
        ModifyAckDeadlineRequest modifyAckDeadlineRequest =
            ModifyAckDeadlineRequest.newBuilder()
                .setSubscription(subscriptionName)
                .addAckIds(message.getAckId())
                .setAckDeadlineSeconds(30)
                .build();

        subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithLeaseManagement() {
  const formattedSubscription = subClient.subscriptionPath(
    projectId,
    subscriptionName
  );

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const maxMessages = 1;
  const newAckDeadlineSeconds = 30;
  const request = {
    subscription: formattedSubscription,
    maxMessages: maxMessages,
  };

  let isProcessed = false;

  // The worker function is meant to be non-blocking. It starts a long-
  // running process, such as writing the message to a table, which may
  // take longer than the default 10-sec acknowledge deadline.
  function worker(message) {
    console.log(`Processing "${message.message.data}"...`);

    setTimeout(() => {
      console.log(`Finished procesing "${message.message.data}".`);
      isProcessed = true;
    }, 30000);
  }

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Obtain the first message.
  const message = response.receivedMessages[0];

  // Send the message to the worker function.
  worker(message);

  let waiting = true;
  while (waiting) {
    await new Promise(r => setTimeout(r, 10000));
    // If the message has been processed..
    if (isProcessed) {
      const ackRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
      };

      //..acknowledges the message.
      await subClient.acknowledge(ackRequest);
      console.log(`Acknowledged: "${message.message.data}".`);
      // Exit after the message is acknowledged.
      waiting = false;
      console.log('Done.');
    } else {
      // If the message is not yet processed..
      const modifyAckRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
        ackDeadlineSeconds: newAckDeadlineSeconds,
      };

      //..reset its ack deadline.
      await subClient.modifyAckDeadline(modifyAckRequest);

      console.log(
        `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
      );
    }
  }
}

synchronousPullWithLeaseManagement().catch(console.error);

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(
    request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info(
        "{}: Running {} for {}s".format(
            time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME
        )
    )
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                request={
                    "subscription": subscription_path,
                    "ack_ids": [ack_id],
                    "ack_deadline_seconds": ACK_DEADLINE,
                }
            )
            logger.info(
                "{}: Reset ack deadline for {} for {}s".format(
                    time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE,
                )
            )

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": [ack_id]}
            )
            logger.info(
                "{}: Acknowledged {}".format(
                    time.strftime("%X", time.gmtime()), msg_data
                )
            )
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print(
    "Received and acknowledged {} messages. Done.".format(
        len(response.received_messages)
    )
)

# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()

Skalieren

Möglicherweise müssen Sie einen Skalierungsmechanismus für Ihre Abonnentenanwendung implementieren, um das Nachrichtenaufkommen zu bewältigen. Die Vorgehensweise hängt von Ihrer Umgebung ab. Sie basiert jedoch im Allgemeinen auf Rückstandmesswerten, die über den Monitoring-Dienst der Google Cloud Operations Suite bereitgestellt werden. Informationen zur Vorgehensweise mit Compute Engine finden Sie unter Anhand von Cloud Monitoring-Messwerten skalieren.

Wechseln Sie zum Abschnitt Publish/Sub der GCP-Messwertliste, um zu erfahren, welche Messwerte programmatisch überwacht werden können.

Wie bei allen dezentralen Diensten müssen Anfragen möglicherweise wiederholt werden.

Duplikate verwalten und Wiederholungsversuche erzwingen

Wenn Sie eine Nachricht nicht vor Ablauf der Bestätigungsfrist bestätigen, sendet Pub/Sub die Nachricht noch einmal. Infolgedessen kann Pub/Sub Nachrichten doppelt senden. Verwenden Sie die Operations Suite von Google Cloud, um Vorgänge mit dem Antwortcode expired zu beobachten, um dieses Problem zu erkennen. Wählen Sie zum Abrufen dieser Daten den Messwert Nachrichtenvorgänge bestätigen aus und gruppieren oder filtern Sie ihn nach dem Label response_code. Beachten Sie, dass response_code ein Systemlabel für einen Messwert ist und kein Messwert an sich.

Verwenden Sie Stackdriver, um nach abgelaufenen Bestätigungsfristen für Nachrichten zu suchen

Verlängern Sie die Nachrichtenfrist, um die Duplizierungsrate zu senken.

  • Clientbibliotheken erledigen die Fristverlängerung automatisch, es gibt jedoch standardmäßig Beschränkungen für die maximal konfigurierbare Fristverlängerung.
  • Wenn Sie eine eigene Clientbibliothek erstellen, können Sie die Bestätigungsfrist mit der Methode modifyAckDeadline verlängern.

Alternativ können Sie festlegen, dass Pub/Sub eine Nachricht noch einmal sendet, indem Sie modifyAckDeadline auf 0 setzen.