Anfragen wiederholen

Veröffentlichungsfehler werden in der Regel clientseitig verursacht Engpässe, z. B. unzureichende Dienst-CPUs, fehlerhafter Thread-Zustand oder Netzwerküberlastung. In der Wiederholungsrichtlinie des Publishers ist die Anzahl der der Zeit, in der Pub/Sub versucht, eine Nachricht zu senden, und die Dauer zwischen den einzelnen Versuchen.

Dieses Dokument enthält Informationen zur Verwendung von Wiederholungsanfragen mit Nachrichten zu einem Thema veröffentlicht.

Hinweise

Bevor Sie den Veröffentlichungsworkflow konfigurieren, müssen Sie die folgenden Schritte ausgeführt haben Aufgaben:

Erforderliche Rollen

So erhalten Sie die Berechtigungen, die Sie zum Wiederholen von Nachrichtenanfragen an ein Thema benötigen: bitten Sie Ihren Administrator, Ihnen Die IAM-Rolle Pub/Sub-Publisher (roles/pubsub.publisher) für das Thema. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Sie benötigen zusätzliche Berechtigungen, um Themen und Abos zu erstellen oder zu aktualisieren.

Wiederholungsanfragen

Wiederholungseinstellungen steuern, wie die Pub/Sub-Clientbibliotheken Veröffentlichungsanfragen wiederholen. Die Clientbibliotheken haben eine der folgenden Wiederholungseinstellungen:

  • Zeitüberschreitung der ersten Anfrage: Die Zeitspanne, nach der eine Clientbibliothek nicht mehr auf die Fertigstellung der ersten Veröffentlichungsanfrage wartet.
  • Wiederholungsverzögerung: Die Zeitspanne, die eine Clientbibliothek nach der Zeitüberschreitung einer Anfrage wartet, bevor sie die Anfrage wiederholt.
  • Gesamtzeitüberschreitung: Die Zeitspanne, nach der eine Clientbibliothek keine weiteren Veröffentlichungsanfragen mehr wiederholt.

Damit Veröffentlichungsanfragen wiederholt werden können, muss die Zeitüberschreitung der ersten Anfrage kürzer sein als die Gesamtzeitüberschreitung. Wenn Sie beispielsweise exponentiellen Backoff verwenden, berechnen die Clientbibliotheken das Anfragezeitlimit und die Wiederholungsverzögerung folgendermaßen:

  • Nach jeder Veröffentlichungsanfrage erhöht sich das Anfragezeitlimit um den Zeitüberschreitungsfaktor der Anfrage bis zum maximalen Anfragezeitlimit.
  • Nach jedem Wiederholungsversuch erhöht sich die Wiederholungsverzögerung um den Wiederholungsverzögerungsfaktor bis zur maximalen Wiederholungsverzögerung.

Nachrichtenanfrage wiederholen

Während der Veröffentlichung kann es sein, dass eine vorübergehende oder dauerhafte Veröffentlichung erfolgt. Störungen. Bei vorübergehenden Fehlern sind in der Regel keine speziellen Schritte da Pub/Sub die Nachrichten automatisch wiederholt.

Ein Fehler kann auch auftreten, wenn ein Veröffentlichungsvorgang erfolgreich ist, die Veröffentlichung Antwort vom Publisher-Client nicht rechtzeitig erhalten. Auch in diesem Fall wird noch einmal versucht. Daraus folgt, dass Sie zwei Nachrichten erhalten, die identisch sind, mit unterschiedlichen Nachrichten-IDs.

Bei anhaltenden Fehlern sollten Sie die Implementierung entsprechender Maßnahmen außerhalb von des Veröffentlichungsprozesses, damit Pub/Sub nicht überlastet wird.

Fehlgeschlagene Veröffentlichungen werden automatisch wiederholt, mit Ausnahme von Fehlern, die dies Wiederholungsversuche rechtfertigen. In diesem Beispielcode wird die Erstellung eines Publishers mit benutzerdefinierten Wiederholungseinstellungen (beachten Sie, dass nicht alle Clientbibliotheken benutzerdefinierte Wiederholungsversuche unterstützen). Einstellungen Siehe API-Referenzdokumentation für die ausgewählte Sprache):

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // By default a publisher will retry for 60 seconds, with an initial backoff
  // of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
  // 30% after each attempt. This changes those defaults.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::RetryPolicyOption>(
              pubsub::LimitedTimeRetryPolicy(
                  /*maximum_duration=*/std::chrono::minutes(10))
                  .clone())
          .set<pubsub::BackoffPolicyOption>(
              pubsub::ExponentialBackoffPolicy(
                  /*initial_delay=*/std::chrono::milliseconds(200),
                  /*maximum_delay=*/std::chrono::seconds(45),
                  /*scaling=*/2.0)
                  .clone())));

  std::vector<future<bool>> done;
  for (char const* data : {"1", "2", "3", "go!"}) {
    done.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([](future<StatusOr<std::string>> f) {
              return f.get().ok();
            }));
  }
  publisher.Flush();
  int count = 0;
  for (auto& f : done) {
    if (f.get()) ++count;
  }
  std::cout << count << " messages sent successfully\n";
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub C# API.


using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;

public class PublishMessageWithRetrySettingsAsyncSample
{
    public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // Retry settings control how the publisher handles retry-able failures
        var maxAttempts = 3;
        var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
        var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
        var backoffMultiplier = 1.3; // default: 1.0
        var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds

        var publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            ApiSettings = new PublisherServiceApiSettings
            {
                PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
                               maxAttempts: maxAttempts,
                               initialBackoff: initialBackoff,
                               maxBackoff: maxBackoff,
                               backoffMultiplier: backoffMultiplier,
                               retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
                       .WithTimeout(totalTimeout)
            }
        }.BuildAsync();
        string message = await publisher.PublishAsync(messageText);
        Console.WriteLine($"Published message {message}");
    }
}

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	vkit "cloud.google.com/go/pubsub/apiv1"
	gax "github.com/googleapis/gax-go/v2"
	"google.golang.org/grpc/codes"
)

func publishWithRetrySettings(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()

	config := &pubsub.ClientConfig{
		PublisherCallOptions: &vkit.PublisherCallOptions{
			Publish: []gax.CallOption{
				gax.WithRetry(func() gax.Retryer {
					return gax.OnCodes([]codes.Code{
						codes.Aborted,
						codes.Canceled,
						codes.Internal,
						codes.ResourceExhausted,
						codes.Unknown,
						codes.Unavailable,
						codes.DeadlineExceeded,
					}, gax.Backoff{
						Initial:    250 * time.Millisecond, // default 100 milliseconds
						Max:        60 * time.Second,       // default 60 seconds
						Multiplier: 1.45,                   // default 1.3
					})
				}),
			},
		},
	}

	client, err := pubsub.NewClientWithConfig(ctx, projectID, config)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte(msg),
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

public class PublishWithRetrySettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithRetrySettingsExample(projectId, topicId);
  }

  public static void publishWithRetrySettingsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Retry settings control how the publisher handles retry-able failures
      Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
      double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
      Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
      Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
      double rpcTimeoutMultiplier = 1.0; // default: 1.0
      Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
      Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds

      RetrySettings retrySettings =
          RetrySettings.newBuilder()
              .setInitialRetryDelay(initialRetryDelay)
              .setRetryDelayMultiplier(retryDelayMultiplier)
              .setMaxRetryDelay(maxRetryDelay)
              .setInitialRpcTimeout(initialRpcTimeout)
              .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
              .setMaxRpcTimeout(maxRpcTimeout)
              .setTotalTimeout(totalTimeout)
              .build();

      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with retry settings: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a publisher client.
const publisherClient = new v1.PublisherClient({
  // optional auth parameters
});
async function publishWithRetrySettings(projectId, topicNameOrId, data) {
  const formattedTopic = publisherClient.projectTopicPath(
    projectId,
    topicNameOrId
  );

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const messagesElement = {
    data: dataBuffer,
  };
  const messages = [messagesElement];

  // Build the request
  const request = {
    topic: formattedTopic,
    messages: messages,
  };

  // Retry settings control how the publisher handles retryable failures. Default values are shown.
  // The `retryCodes` array determines which grpc errors will trigger an automatic retry.
  // The `backoffSettings` object lets you specify the behaviour of retries over time.
  const retrySettings = {
    retryCodes: [
      10, // 'ABORTED'
      1, // 'CANCELLED',
      4, // 'DEADLINE_EXCEEDED'
      13, // 'INTERNAL'
      8, // 'RESOURCE_EXHAUSTED'
      14, // 'UNAVAILABLE'
      2, // 'UNKNOWN'
    ],
    backoffSettings: {
      // The initial delay time, in milliseconds, between the completion
      // of the first failed request and the initiation of the first retrying request.
      initialRetryDelayMillis: 100,
      // The multiplier by which to increase the delay time between the completion
      // of failed requests, and the initiation of the subsequent retrying request.
      retryDelayMultiplier: 1.3,
      // The maximum delay time, in milliseconds, between requests.
      // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
      maxRetryDelayMillis: 60000,
      // The initial timeout parameter to the request.
      initialRpcTimeoutMillis: 5000,
      // The multiplier by which to increase the timeout parameter between failed requests.
      rpcTimeoutMultiplier: 1.0,
      // The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
      // rpcTimeoutMultiplier will no longer be used to increase the timeout.
      maxRpcTimeoutMillis: 600000,
      // The total time, in milliseconds, starting from when the initial request is sent,
      // after which an error will be returned, regardless of the retrying attempts made meanwhile.
      totalTimeoutMillis: 600000,
    },
  };

  const [response] = await publisherClient.publish(request, {
    retry: retrySettings,
  });
  console.log(`Message ${response.messageIds} published.`);
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
import {v1} from '@google-cloud/pubsub';

// Creates a publisher client.
const publisherClient = new v1.PublisherClient({
  // optional auth parameters
});

async function publishWithRetrySettings(
  projectId: string,
  topicNameOrId: string,
  data: string
) {
  const formattedTopic = publisherClient.projectTopicPath(
    projectId,
    topicNameOrId
  );

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const messagesElement = {
    data: dataBuffer,
  };
  const messages = [messagesElement];

  // Build the request
  const request = {
    topic: formattedTopic,
    messages: messages,
  };

  // Retry settings control how the publisher handles retryable failures. Default values are shown.
  // The `retryCodes` array determines which grpc errors will trigger an automatic retry.
  // The `backoffSettings` object lets you specify the behaviour of retries over time.
  const retrySettings = {
    retryCodes: [
      10, // 'ABORTED'
      1, // 'CANCELLED',
      4, // 'DEADLINE_EXCEEDED'
      13, // 'INTERNAL'
      8, // 'RESOURCE_EXHAUSTED'
      14, // 'UNAVAILABLE'
      2, // 'UNKNOWN'
    ],
    backoffSettings: {
      // The initial delay time, in milliseconds, between the completion
      // of the first failed request and the initiation of the first retrying request.
      initialRetryDelayMillis: 100,
      // The multiplier by which to increase the delay time between the completion
      // of failed requests, and the initiation of the subsequent retrying request.
      retryDelayMultiplier: 1.3,
      // The maximum delay time, in milliseconds, between requests.
      // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
      maxRetryDelayMillis: 60000,
      // The initial timeout parameter to the request.
      initialRpcTimeoutMillis: 5000,
      // The multiplier by which to increase the timeout parameter between failed requests.
      rpcTimeoutMultiplier: 1.0,
      // The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
      // rpcTimeoutMultiplier will no longer be used to increase the timeout.
      maxRpcTimeoutMillis: 600000,
      // The total time, in milliseconds, starting from when the initial request is sent,
      // after which an error will be returned, regardless of the retrying attempts made meanwhile.
      totalTimeoutMillis: 600000,
    },
  };

  const [response] = await publisherClient.publish(request, {
    retry: retrySettings,
  });
  console.log(`Message ${response.messageIds} published.`);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from google import api_core
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
    initial=0.250,  # seconds (default: 0.1)
    maximum=90.0,  # seconds (default: 60.0)
    multiplier=1.45,  # default: 1.3
    deadline=300.0,  # seconds (default: 60.0)
    predicate=api_core.retry.if_exception_type(
        api_core.exceptions.Aborted,
        api_core.exceptions.DeadlineExceeded,
        api_core.exceptions.InternalServerError,
        api_core.exceptions.ResourceExhausted,
        api_core.exceptions.ServiceUnavailable,
        api_core.exceptions.Unknown,
        api_core.exceptions.Cancelled,
    ),
)

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
    print(future.result())

print(f"Published messages with retry settings to {topic_path}.")

Anfragen mit Sortierschlüsseln wiederholen

Angenommen, Sie haben einen einzelnen Publisher-Client. Sie verwenden die Pub/Sub-Clientbibliotheken zum Veröffentlichen der Nachrichten 1, 2 und 3 für die Schlüssel A erhalten. Angenommen, die veröffentlichte Antwort auf Nachricht 1 ist nicht vom Publisher-Client vor Ablauf der RPC-Frist empfangen wird. Nachricht 1 müssen erneut veröffentlicht werden. Die Reihenfolge der vom Abonnentenclient empfangenen Nachrichten wird dann zu 1, 1, 2 und 3, wenn Sie davon ausgehen, dass Nachricht 2 erst nach Nachricht 1 wird erfolgreich abgeschlossen. Jede veröffentlichte Mitteilung hat eine eigene Nachrichten-ID. Aus Sicht des Abonnentenclients wurden vier Nachrichten wobei die ersten beiden denselben Inhalt haben.

Das Wiederholen von Veröffentlichungsanfragen mit Sortierschlüsseln kann auch durch Batchanfragen erschwert werden Einstellungen. Die Clientbibliothek fasst Nachrichten in einem Batch zusammen, um die Effizienz zu steigern. Veröffentlichung. Fahren Sie mit dem vorherigen Beispiel fort und gehen Sie davon aus, dass die Nachrichten 1 und 2 in Batches zusammengefasst. Dieser Batch wird als einzelne Anfrage an den Server gesendet. Wenn wenn der Server keine Antwort rechtzeitig zurückgibt, versucht der Publisher-Client, dies zu wiederholen. einen Stapel von zwei Nachrichten. Daher ist es möglich, dass der Abonnentenclient empfängt die Nachrichten 1, 2, 1, 2 und 3. Wenn Sie Pub/Sub verwenden um Nachrichten der Reihe nach zu veröffentlichen, und ein Veröffentlichungsvorgang schlägt fehl, der Dienst die Veröffentlichungsvorgänge für alle verbleibenden Nachrichten auf demselben Reihenfolgeschlüssel. Ein Publisher-Kunde kann dann entscheiden, einem der folgende Vorgänge:

  • Alle Nachrichten mit Fehlern der Reihe nach noch einmal veröffentlichen

  • Eine Teilmenge der Nachrichten mit Fehlern der Reihe nach noch einmal veröffentlichen

  • Neue Nachrichten veröffentlichen

Wenn ein nicht wiederholbarer Fehler auftritt, veröffentlicht die Clientbibliothek die Nachricht nicht und veröffentlicht keine weiteren Nachrichten mit demselben Reihenfolgeschlüssel. Wenn ein Publisher beispielsweise eine Nachricht an ein nicht vorhandenes Thema sendet, tritt ein nicht wiederholbarer Fehler auf. Wenn Sie weitere Nachrichten mit demselben Reihenfolgenschlüssel veröffentlichen möchten, rufen Sie eine Methode auf, um die Veröffentlichung fortzusetzen und dann wieder zu veröffentlichen.

Im folgenden Beispiel wird gezeigt, wie Nachrichten mit demselben Reihenfolgeschlüssel wieder veröffentlicht werden.

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto const& da = datum;  // workaround MSVC lambda capture confusion
    auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
      auto const msg = da.ordering_key + "#" + da.data;
      auto id = f.get();
      if (!id) {
        std::cout << "An error has occurred publishing " << msg << "\n";
        publisher.ResumePublish(da.ordering_key);
        return;
      }
      std::cout << "Message " << msg << " published as id=" << *id << "\n";
    };
    done.push_back(
        publisher
            .Publish(pubsub::MessageBuilder{}
                         .SetData("Hello World! [" + datum.data + "]")
                         .SetOrderingKey(datum.ordering_key)
                         .Build())
            .then(handler));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class ResumePublishSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
                publisher.ResumePublish(keyAndMessage.Item1);
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Sending messages to the same region ensures they are received in order
	// even when multiple publishers are used.
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.EnableMessageOrdering = true
	key := "some-ordering-key"

	res := t.Publish(ctx, &pubsub.Message{
		Data:        []byte("some-message"),
		OrderingKey: key,
	})
	_, err = res.Get(ctx)
	if err != nil {
		// Error handling code can be added here.
		fmt.Printf("Failed to publish: %s\n", err)

		// Resume publish on an ordering key that has had unrecoverable errors.
		// After such an error publishes with this ordering key will fail
		// until this method is called.
		t.ResumePublish(key)
	}

	fmt.Fprint(w, "Published a message with ordering key successfully\n")
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class ResumePublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    resumePublishWithOrderingKeysExample(projectId, topicId);
  }

  public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            .setEnableMessageOrdering(true)
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
                // (Beta) Must call resumePublish to reset key and continue publishing with order.
                publisher.resumePublish(pubsubMessage.getOrderingKey());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function resumePublish(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  const publishOptions = {
    messageOrdering: true,
  };

  // Publishes the message
  const publisher = pubSubClient.topic(topicNameOrId, publishOptions);
  try {
    const message = {
      data: dataBuffer,
      orderingKey: orderingKey,
    };
    const messageId = await publisher.publishMessage(message);
    console.log(`Message ${messageId} published.`);

    return messageId;
  } catch (e) {
    console.log(`Could not publish: ${e}`);
    publisher.resumePublishing(orderingKey);
    return null;
  }
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    try:
        print(future.result())
    except RuntimeError:
        # Resume publish on an ordering key that has had unrecoverable errors.
        publisher.resume_publish(topic_path, ordering_key)

print(f"Resumed publishing messages with ordering keys to {topic_path}.")

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
  topic.publish_async "This is message ##{i}.",
                      ordering_key: "ordering-key" do |result|
    if result.succeeded?
      puts "Message ##{i} successfully published."
    else
      puts "Message ##{i} failed to publish"
      # Allow publishing to continue on "ordering-key" after processing the
      # failure.
      topic.resume_publish "ordering-key"
    end
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!

Nächste Schritte

Informationen zum Konfigurieren erweiterter Veröffentlichungsoptionen finden Sie hier: