Anfragen wiederholen

Fehler bei der Veröffentlichung sind in der Regel auf clientseitige Engpässe zurückzuführen, z. B. unzureichende Dienst-CPUs, eine schlechte Threadintegrität oder Netzwerküberlastung. Die Richtlinie für Wiederholungen des Publishers legt fest, wie oft Pub/Sub versucht, eine Nachricht zuzustellen, und wie lange die Zeit zwischen den einzelnen Versuchen ist.

In diesem Dokument erfahren Sie mehr über die Verwendung von Wiederholungsanfragen mit Nachrichten, die in einem Thema veröffentlicht wurden.

Hinweise

Bevor du den Veröffentlichungsworkflow konfigurierst, musst du die folgenden Aufgaben erledigt haben:

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Pub/Sub Publisher (roles/pubsub.publisher) für das Thema zuzuweisen, damit Sie die Berechtigungen erhalten, die Sie zum Wiederholen von Nachrichtenanfragen an ein Thema benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Sie benötigen zusätzliche Berechtigungen, um Themen und Abos zu erstellen oder zu aktualisieren.

Anfragen wiederholen

Wiederholungseinstellungen steuern, wie die Pub/Sub-Clientbibliotheken Veröffentlichungsanfragen wiederholen. Die Clientbibliotheken haben eine der folgenden Wiederholungseinstellungen:

  • Zeitüberschreitung der ersten Anfrage: Die Zeitspanne, nach der eine Clientbibliothek nicht mehr auf die Fertigstellung der ersten Veröffentlichungsanfrage wartet.
  • Wiederholungsverzögerung: Die Zeitspanne, die eine Clientbibliothek nach der Zeitüberschreitung einer Anfrage wartet, bevor sie die Anfrage wiederholt.
  • Gesamtzeitüberschreitung: Die Zeitspanne, nach der eine Clientbibliothek keine weiteren Veröffentlichungsanfragen mehr wiederholt.

Damit Veröffentlichungsanfragen wiederholt werden können, muss die Zeitüberschreitung der ersten Anfrage kürzer sein als die Gesamtzeitüberschreitung. Wenn Sie beispielsweise exponentiellen Backoff verwenden, berechnen die Clientbibliotheken das Anfragezeitlimit und die Wiederholungsverzögerung folgendermaßen:

  • Nach jeder Veröffentlichungsanfrage erhöht sich das Anfragezeitlimit um den Zeitüberschreitungsfaktor der Anfrage bis zum maximalen Anfragezeitlimit.
  • Nach jedem Wiederholungsversuch erhöht sich die Wiederholungsverzögerung um den Wiederholungsverzögerungsfaktor bis zur maximalen Wiederholungsverzögerung.

Nachrichtenanfrage noch einmal senden

Während des Veröffentlichungsvorgangs können vorübergehende oder dauerhafte Veröffentlichungsfehler auftreten. Bei vorübergehenden Fehlern müssen Sie in der Regel nichts unternehmen, da Pub/Sub die Nachrichten automatisch noch einmal sendet.

Ein Fehler kann auch auftreten, wenn ein Veröffentlichungsvorgang erfolgreich war, die Veröffentlichungsantwort aber nicht rechtzeitig vom Publisher-Client empfangen wurde. Auch in diesem Fall wird der Vorgang „veröffentlichen“ wiederholt. Daher können zwei identische Nachrichten mit unterschiedlichen Nachrichten-IDs vorhanden sein.

Bei anhaltenden Fehlern sollten Sie geeignete Maßnahmen außerhalb des Veröffentlichungsprozesses ergreifen, um Pub/Sub nicht zu überlasten.

Fehlgeschlagene Veröffentlichungen werden automatisch wiederholt, außer bei Fehlern, die keine Wiederholungen rechtfertigen. Mit diesem Beispielcode wird dargestellt, wie ein Publisher mit benutzerdefinierten Wiederholungseinstellungen erstellt wird. Beachten Sie, dass nicht alle Clientbibliotheken benutzerdefinierte Wiederholungseinstellungen unterstützen. Dazu wird auf die API-Referenzdokumentation für die Sprache Ihrer Wahl verwiesen:

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // By default a publisher will retry for 60 seconds, with an initial backoff
  // of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
  // 30% after each attempt. This changes those defaults.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::RetryPolicyOption>(
              pubsub::LimitedTimeRetryPolicy(
                  /*maximum_duration=*/std::chrono::minutes(10))
                  .clone())
          .set<pubsub::BackoffPolicyOption>(
              pubsub::ExponentialBackoffPolicy(
                  /*initial_delay=*/std::chrono::milliseconds(200),
                  /*maximum_delay=*/std::chrono::seconds(45),
                  /*scaling=*/2.0)
                  .clone())));

  std::vector<future<bool>> done;
  for (char const* data : {"1", "2", "3", "go!"}) {
    done.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([](future<StatusOr<std::string>> f) {
              return f.get().ok();
            }));
  }
  publisher.Flush();
  int count = 0;
  for (auto& f : done) {
    if (f.get()) ++count;
  }
  std::cout << count << " messages sent successfully\n";
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub C# API.


using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;

public class PublishMessageWithRetrySettingsAsyncSample
{
    public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // Retry settings control how the publisher handles retry-able failures
        var maxAttempts = 3;
        var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
        var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
        var backoffMultiplier = 1.3; // default: 1.0
        var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds

        var publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            ApiSettings = new PublisherServiceApiSettings
            {
                PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
                               maxAttempts: maxAttempts,
                               initialBackoff: initialBackoff,
                               maxBackoff: maxBackoff,
                               backoffMultiplier: backoffMultiplier,
                               retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
                       .WithTimeout(totalTimeout)
            }
        }.BuildAsync();
        string message = await publisher.PublishAsync(messageText);
        Console.WriteLine($"Published message {message}");
    }
}

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	vkit "cloud.google.com/go/pubsub/apiv1"
	gax "github.com/googleapis/gax-go/v2"
	"google.golang.org/grpc/codes"
)

func publishWithRetrySettings(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()

	config := &pubsub.ClientConfig{
		PublisherCallOptions: &vkit.PublisherCallOptions{
			Publish: []gax.CallOption{
				gax.WithRetry(func() gax.Retryer {
					return gax.OnCodes([]codes.Code{
						codes.Aborted,
						codes.Canceled,
						codes.Internal,
						codes.ResourceExhausted,
						codes.Unknown,
						codes.Unavailable,
						codes.DeadlineExceeded,
					}, gax.Backoff{
						Initial:    250 * time.Millisecond, // default 100 milliseconds
						Max:        60 * time.Second,       // default 60 seconds
						Multiplier: 1.45,                   // default 1.3
					})
				}),
			},
		},
	}

	client, err := pubsub.NewClientWithConfig(ctx, projectID, config)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte(msg),
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

public class PublishWithRetrySettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithRetrySettingsExample(projectId, topicId);
  }

  public static void publishWithRetrySettingsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Retry settings control how the publisher handles retry-able failures
      Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
      double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
      Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
      Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
      double rpcTimeoutMultiplier = 1.0; // default: 1.0
      Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
      Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds

      RetrySettings retrySettings =
          RetrySettings.newBuilder()
              .setInitialRetryDelay(initialRetryDelay)
              .setRetryDelayMultiplier(retryDelayMultiplier)
              .setMaxRetryDelay(maxRetryDelay)
              .setInitialRpcTimeout(initialRpcTimeout)
              .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
              .setMaxRpcTimeout(maxRpcTimeout)
              .setTotalTimeout(totalTimeout)
              .build();

      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with retry settings: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {PubSub} = require('@google-cloud/pubsub');

async function publishWithRetrySettings(topicNameOrId, data) {
  const pubsubClient = new PubSub();

  // Retry settings control how the publisher handles retryable failures. Default values are shown.
  // The `retryCodes` array determines which grpc errors will trigger an automatic retry.
  // The `backoffSettings` object lets you specify the behaviour of retries over time.
  //
  // Reference this document to see the current defaults for publishing:
  // https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
  //
  // Please note that _all_ items must be included when passing these settings to topic().
  // Otherwise, unpredictable (incorrect) defaults may be assumed.
  const retrySettings = {
    retryCodes: [
      10, // 'ABORTED'
      1, // 'CANCELLED',
      4, // 'DEADLINE_EXCEEDED'
      13, // 'INTERNAL'
      8, // 'RESOURCE_EXHAUSTED'
      14, // 'UNAVAILABLE'
      2, // 'UNKNOWN'
    ],
    backoffSettings: {
      // The initial delay time, in milliseconds, between the completion
      // of the first failed request and the initiation of the first retrying request.
      initialRetryDelayMillis: 100,
      // The multiplier by which to increase the delay time between the completion
      // of failed requests, and the initiation of the subsequent retrying request.
      retryDelayMultiplier: 4,
      // The maximum delay time, in milliseconds, between requests.
      // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
      maxRetryDelayMillis: 60000,
      // The initial timeout parameter to the request.
      initialRpcTimeoutMillis: 60000,
      // The multiplier by which to increase the timeout parameter between failed requests.
      rpcTimeoutMultiplier: 1.0,
      // The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
      // rpcTimeoutMultiplier will no longer be used to increase the timeout.
      maxRpcTimeoutMillis: 60000,
      // The total time, in milliseconds, starting from when the initial request is sent,
      // after which an error will be returned, regardless of the retrying attempts made meanwhile.
      totalTimeoutMillis: 600000,
    },
  };

  // Cache topic objects (publishers) and reuse them.
  const topic = pubsubClient.topic(topicNameOrId, {
    gaxOpts: {
      retry: retrySettings,
    },
  });

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const messageId = await topic.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
import {PubSub} from '@google-cloud/pubsub';

async function publishWithRetrySettings(topicNameOrId: string, data: string) {
  const pubsubClient = new PubSub();

  // Retry settings control how the publisher handles retryable failures. Default values are shown.
  // The `retryCodes` array determines which grpc errors will trigger an automatic retry.
  // The `backoffSettings` object lets you specify the behaviour of retries over time.
  //
  // Reference this document to see the current defaults for publishing:
  // https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
  //
  // Please note that _all_ items must be included when passing these settings to topic().
  // Otherwise, unpredictable (incorrect) defaults may be assumed.
  const retrySettings = {
    retryCodes: [
      10, // 'ABORTED'
      1, // 'CANCELLED',
      4, // 'DEADLINE_EXCEEDED'
      13, // 'INTERNAL'
      8, // 'RESOURCE_EXHAUSTED'
      14, // 'UNAVAILABLE'
      2, // 'UNKNOWN'
    ],
    backoffSettings: {
      // The initial delay time, in milliseconds, between the completion
      // of the first failed request and the initiation of the first retrying request.
      initialRetryDelayMillis: 100,
      // The multiplier by which to increase the delay time between the completion
      // of failed requests, and the initiation of the subsequent retrying request.
      retryDelayMultiplier: 4,
      // The maximum delay time, in milliseconds, between requests.
      // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
      maxRetryDelayMillis: 60000,
      // The initial timeout parameter to the request.
      initialRpcTimeoutMillis: 60000,
      // The multiplier by which to increase the timeout parameter between failed requests.
      rpcTimeoutMultiplier: 1.0,
      // The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
      // rpcTimeoutMultiplier will no longer be used to increase the timeout.
      maxRpcTimeoutMillis: 60000,
      // The total time, in milliseconds, starting from when the initial request is sent,
      // after which an error will be returned, regardless of the retrying attempts made meanwhile.
      totalTimeoutMillis: 600000,
    },
  };

  // Cache topic objects (publishers) and reuse them.
  const topic = pubsubClient.topic(topicNameOrId, {
    gaxOpts: {
      retry: retrySettings,
    },
  });

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const messageId = await topic.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from google import api_core
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
    initial=0.250,  # seconds (default: 0.1)
    maximum=90.0,  # seconds (default: 60.0)
    multiplier=1.45,  # default: 1.3
    deadline=300.0,  # seconds (default: 60.0)
    predicate=api_core.retry.if_exception_type(
        api_core.exceptions.Aborted,
        api_core.exceptions.DeadlineExceeded,
        api_core.exceptions.InternalServerError,
        api_core.exceptions.ResourceExhausted,
        api_core.exceptions.ServiceUnavailable,
        api_core.exceptions.Unknown,
        api_core.exceptions.Cancelled,
    ),
)

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
    print(future.result())

print(f"Published messages with retry settings to {topic_path}.")

Anfragen mit Sortierungsschlüsseln wiederholen

Angenommen, Sie haben einen einzelnen Publisher-Client. Sie verwenden die Pub/Sub-Clientbibliotheken, um Nachrichten 1, 2 und 3 für denselben Sortierungsschlüssel A zu veröffentlichen. Angenommen, die veröffentlichte Antwort für Nachricht 1 wird vom Publisher-Client nicht vor Ablauf der RPC-Frist empfangen. Nachricht 1 muss noch einmal veröffentlicht werden. Die Abfolge der vom Abonnentenclient empfangenen Nachrichten lautet dann 1, 1, 2 und 3, wenn davon ausgegangen wird, dass Nachricht 2 erst veröffentlicht wird, nachdem Nachricht 1 erfolgreich abgeschlossen wurde. Jede veröffentlichte Nachricht hat eine eigene Nachrichten-ID. Aus Sicht des Abonnentenclients wurden vier Nachrichten veröffentlicht, wobei die ersten beiden identisch sind.

Das erneute Ausführen von Veröffentlichungsanfragen mit Sortierungsschlüsseln kann auch durch Batcheinstellungen erschwert werden. Die Clientbibliothek gruppiert Nachrichten für eine effizientere Veröffentlichung. Fahren Sie mit dem vorherigen Beispiel fort und nehmen Sie an, dass Nachrichten 1 und 2 in einem Batch zusammengefasst werden. Dieser Batch wird als einzelne Anfrage an den Server gesendet. Wenn der Server nicht rechtzeitig eine Antwort zurückgibt, versucht der Publisher-Client noch einmal, diese beiden Nachrichten zu senden. Daher ist es möglich, dass der Abonnentenclient die Nachrichten 1, 2, 1, 2 und 3 empfängt. Wenn Sie eine Pub/Sub-Clientbibliothek zum geordneten Veröffentlichen von Nachrichten verwenden und ein Veröffentlichungsvorgang fehlschlägt, schlägt der Dienst die Veröffentlichungsvorgänge für alle verbleibenden Nachrichten mit demselben Reihenfolgeschlüssel fehl. Ein Publisher-Client kann dann einen der folgenden Schritte ausführen:

  • Alle fehlgeschlagenen Nachrichten der Reihe nach noch einmal veröffentlichen

  • Einen Teil der fehlgeschlagenen Nachrichten in der richtigen Reihenfolge noch einmal veröffentlichen

  • Neue Nachrichten veröffentlichen

Wenn ein nicht wiederholbarer Fehler auftritt, veröffentlicht die Clientbibliothek die Nachricht nicht und veröffentlicht keine weiteren Nachrichten mit demselben Reihenfolgeschlüssel. Wenn ein Publisher beispielsweise eine Nachricht an ein nicht vorhandenes Thema sendet, tritt ein nicht wiederholbarer Fehler auf. Wenn Sie weitere Nachrichten mit demselben Reihenfolgenschlüssel veröffentlichen möchten, rufen Sie eine Methode auf, um die Veröffentlichung fortzusetzen und dann wieder zu veröffentlichen.

Im folgenden Beispiel wird gezeigt, wie Nachrichten mit demselben Reihenfolgeschlüssel wieder veröffentlicht werden.

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto const& da = datum;  // workaround MSVC lambda capture confusion
    auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
      auto const msg = da.ordering_key + "#" + da.data;
      auto id = f.get();
      if (!id) {
        std::cout << "An error has occurred publishing " << msg << "\n";
        publisher.ResumePublish(da.ordering_key);
        return;
      }
      std::cout << "Message " << msg << " published as id=" << *id << "\n";
    };
    done.push_back(
        publisher
            .Publish(pubsub::MessageBuilder{}
                         .SetData("Hello World! [" + datum.data + "]")
                         .SetOrderingKey(datum.ordering_key)
                         .Build())
            .then(handler));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class ResumePublishSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
                publisher.ResumePublish(keyAndMessage.Item1);
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.EnableMessageOrdering = true
	key := "some-ordering-key"

	res := t.Publish(ctx, &pubsub.Message{
		Data:        []byte("some-message"),
		OrderingKey: key,
	})
	_, err = res.Get(ctx)
	if err != nil {
		// Error handling code can be added here.
		fmt.Printf("Failed to publish: %s\n", err)

		// Resume publish on an ordering key that has had unrecoverable errors.
		// After such an error publishes with this ordering key will fail
		// until this method is called.
		t.ResumePublish(key)
	}

	fmt.Fprint(w, "Published a message with ordering key successfully\n")
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class ResumePublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    resumePublishWithOrderingKeysExample(projectId, topicId);
  }

  public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            .setEnableMessageOrdering(true)
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
                // (Beta) Must call resumePublish to reset key and continue publishing with order.
                publisher.resumePublish(pubsubMessage.getOrderingKey());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function resumePublish(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  const publishOptions = {
    messageOrdering: true,
  };

  // Cache topic objects (publishers) and reuse them.
  //
  // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
  // key are in the same region. For list of locational endpoints for Pub/Sub, see:
  // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
  const publisher = pubSubClient.topic(topicNameOrId, publishOptions);

  // Publishes the message
  try {
    const message = {
      data: dataBuffer,
      orderingKey: orderingKey,
    };
    const messageId = await publisher.publishMessage(message);
    console.log(`Message ${messageId} published.`);

    return messageId;
  } catch (e) {
    console.log(`Could not publish: ${e}`);
    publisher.resumePublishing(orderingKey);
    return null;
  }
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    try:
        print(future.result())
    except RuntimeError:
        # Resume publish on an ordering key that has had unrecoverable errors.
        publisher.resume_publish(topic_path, ordering_key)

print(f"Resumed publishing messages with ordering keys to {topic_path}.")

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
  topic.publish_async "This is message ##{i}.",
                      ordering_key: "ordering-key" do |result|
    if result.succeeded?
      puts "Message ##{i} successfully published."
    else
      puts "Message ##{i} failed to publish"
      # Allow publishing to continue on "ordering-key" after processing the
      # failure.
      topic.resume_publish "ordering-key"
    end
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!

Nächste Schritte

Informationen zum Konfigurieren erweiterter Veröffentlichungsoptionen finden Sie unter den folgenden Links: