Nachrichten zu Themen veröffentlichen

Mit Sammlungen den Überblick behalten Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.

In diesem Dokument erfahren Sie mehr zum Veröffentlichen von Nachrichten.

Eine Publisher-Anwendung erstellt und sendet Nachrichten an ein Thema. Pub/Sub bietet eine mindestens einmalige Nachrichtenzustellung und die optimale Reihenfolge für bestehende Abonnenten.

Der allgemeine Ablauf für eine Publisher-Anwendung ist:

  1. Eine Nachricht mit den entsprechenden Daten erstellen.
  2. Senden Sie eine Anfrage an den Pub/Sub-Server, um die Nachricht im angegebenen Thema zu veröffentlichen.

Hinweis

Bevor Sie den Veröffentlichungsworkflow konfigurieren, müssen Sie die folgenden Aufgaben ausführen:

  • Erstellen Sie ein Thema.

  • Optional: Erstellen Sie ein Abo.

Nachrichtenformat

Eine Nachricht besteht aus Feldern mit den Nachrichtendaten und Metadaten. Geben Sie in der Nachricht mindestens eine der folgenden Optionen an:

Wenn Sie die REST API verwenden, müssen die Nachrichtendaten Base64-codiert sein.

Der Pub/Sub-Dienst fügt der Nachricht die folgenden Felder hinzu:

  • Eine für das Thema eindeutige Nachrichten-ID
  • Ein Zeitstempel, wenn der Pub/Sub-Dienst die Nachricht empfängt

Nachrichten veröffentlichen

Sie können Nachrichten mit der Google Cloud CLI oder der Pub/Sub API veröffentlichen. Die Clientbibliotheken können Nachrichten asynchron veröffentlichen.

Console

So veröffentlichen Sie eine Nachricht:

  1. Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.

    Zur Seite „Pub/Sub-Themen“

  2. Klicken Sie auf die Themen-ID.

  3. Klicken Sie auf der Seite Themendetails unter Nachrichten auf Nachricht veröffentlichen.

  4. Geben Sie im Feld Nachrichtentext die Nachrichtendaten ein.

  5. Optional: Nachrichtenattribute hinzufügen.

    1. Klicken Sie auf Attribut hinzufügen.

    2. Geben Sie einen Schlüssel und einen Wert für das Attribut ein.

  6. Klicken Sie auf Veröffentlichen.

gcloud

Veröffentlichen Sie eine Nachricht mit dem Befehl gcloud pubsub topics publish:

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  [--attribute=KEY="VALUE",...]

Dabei gilt:

  • TOPIC_ID: ID des Themas
  • MESSAGE_DATA: ein String mit den Nachrichtendaten
  • KEY ist der Schlüssel eines Nachrichtenattributs
  • VALUE: der Wert für den Schlüssel des Nachrichtenattributs

REST

Senden Sie eine POST-Anfrage wie die folgende, um eine Nachricht zu veröffentlichen:

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

Dabei gilt:

  • PROJECT_ID: Projekt-ID des Projekts mit dem Thema
  • TOPIC_ID: ID des Themas

Geben Sie im Anfragetext die folgenden Felder an:

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
    }
  ]
}

Dabei gilt:

  • KEY: der Schlüssel eines Nachrichtenattributs
  • VALUE: der Wert für den Schlüssel des Nachrichtenattributs
  • MESSAGE_DATA ist ein base64-codierter String mit den Nachrichtendaten

Die Nachricht muss entweder ein nicht leeres Datenfeld oder mindestens ein Attribut enthalten.

Wenn die Anfrage erfolgreich ist, ist die Antwort ein JSON-Objekt mit der Nachrichten-ID. Das folgende Beispiel ist eine Antwort mit einer Nachrichten-ID:

{
  "messageIds": [
    "19916711285",
  ]
}

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto message_id = publisher.Publish(
      pubsub::MessageBuilder{}.SetData("Hello World!").Build());
  auto done = message_id.then([](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
    std::cout << "Hello World! published with id=" << *id << "\n";
  });
  // Block until the message is published
  done.get();
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishMessagesAsyncSample
{
    public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error ocurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)

	for i := 0; i < n; i++ {
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})

		wg.Add(1)
		go func(i int, res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			id, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
	}
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PublishWithErrorHandlerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithErrorHandlerExample(projectId, topicId);
  }

  public static void publishWithErrorHandlerExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      List<String> messages = Arrays.asList("first message", "second message");

      for (final String message : messages) {
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle success / failure
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // details on the API exception
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + message);
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic)
                System.out.println("Published message ID: " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  try {
    const messageId = await pubSubClient
      .topic(topicNameOrId)
      .publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(`Received error while publishing: ${error.message}`);

    process.exitCode = 1;
  }
}

PHP

Folgen Sie der Einrichtungsanleitung für PHP unter Schnellstart: Clientbibliotheken verwenden, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub PHP API.

use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $topic->publish((new MessageBuilder)->setData($message)->build());

    print('Message published' . PHP_EOL);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

for i in range(10):
    data = str(i)
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

# topic_id = "your-topic-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

begin
  topic.publish_async "This is a test message." do |result|
    raise "Failed to publish the message." unless result.succeeded?
    puts "Message published asynchronously."
  end

  # Stop the async_publisher to send all queued messages immediately.
  topic.async_publisher.stop.wait!
rescue StandardError => e
  puts "Received error while publishing: #{e.message}"
end

Nachdem Sie eine Nachricht veröffentlicht haben, gibt der Pub/Sub-Dienst die Nachrichten-ID an den Publisher zurück.

Attribute verwenden

Sie können benutzerdefinierte Attribute als Metadaten in Pub/Sub-Nachrichten einbetten. Attribute können Text- oder Bytestrings sein. Pro Nachricht sind maximal 100 Attribute zulässig. Attributschlüssel dürfen nicht mit goog beginnen und dürfen 256 Byte nicht überschreiten. Attributwerte dürfen 1.024 Byte nicht überschreiten. Das Nachrichtenschema kann so dargestellt werden:

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string,
  "orderingKey": string
}

Das JSON-Schema PubsubMessage wird als Teil der Dokumentation zu REST und RPC veröffentlicht.

gcloud

gcloud pubsub topics publish my-topic --message="hello" \
  --attribute="origin=gcloud-sample,username=gcp"

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<future<void>> done;
  for (int i = 0; i != 10; ++i) {
    auto message_id = publisher.Publish(
        pubsub::MessageBuilder{}
            .SetData("Hello World! [" + std::to_string(i) + "]")
            .SetAttribute("origin", "cpp-sample")
            .SetAttribute("username", "gcp")
            .Build());
    done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << i << " published with id=" << *id << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Threading.Tasks;

public class PublishMessageWithCustomAttributesAsyncSample
{
    public async Task PublishMessageWithCustomAttributesAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        var pubsubMessage = new PubsubMessage
        {
            // The data is any arbitrary ByteString. Here, we're using text.
            Data = ByteString.CopyFromUtf8(messageText),
            // The attributes provide metadata in a string-to-string dictionary.
            Attributes =
            {
                { "year", "2020" },
                { "author", "unknown" }
            }
        };
        string message = await publisher.PublishAsync(pubsubMessage);
        Console.WriteLine($"Published message {message}");
    }
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Hello world!"),
		Attributes: map[string]string{
			"origin":   "golang",
			"username": "gcp",
		},
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithCustomAttributesExample(projectId, topicId);
  }

  public static void publishWithCustomAttributesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder()
              .setData(data)
              .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
              .build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with custom attributes: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessageWithCustomAttributes(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Add two custom attributes, origin and username, to the message
  const customAttributes = {
    origin: 'nodejs-sample',
    username: 'gcp',
  };

  const messageId = await pubSubClient
    .topic(topicNameOrId)
    .publishMessage({data: dataBuffer, attributes: customAttributes});
  console.log(`Message ${messageId} published.`);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # Add two attributes, origin and username, to the message
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp"
    )
    print(future.result())

print(f"Published messages with custom attributes to {topic_path}.")

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

# topic_id = "your-topic-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Bestellschlüssel verwenden

Wenn Nachrichten denselben Reihenfolgeschlüssel haben und Sie die Nachrichten in derselben Region veröffentlichen, können Abonnenten die Nachrichten in der entsprechenden Reihenfolge empfangen. Das Veröffentlichen von Nachrichten mit Sortierungsschlüsseln kann die Latenz erhöhen. Verwenden Sie einen regionalen Endpunkt, um Nachrichten in derselben Region zu veröffentlichen.

Sie können Nachrichten mit Bestellschlüsseln über die Google Cloud Console, die Google Cloud CLI oder die Pub/Sub API veröffentlichen.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Themen auf.

    Zur Seite „Pub/Sub-Themen“

  2. Klicken Sie auf die Themen-ID.

  3. Klicken Sie auf der Seite Themendetails unter Nachrichten auf Nachricht veröffentlichen.

  4. Geben Sie im Feld Nachrichtentext die Nachrichtendaten ein.

  5. Geben Sie im Feld Nachrichtenreihenfolge einen Reihenfolgeschlüssel ein.

  6. Klicken Sie auf Veröffentlichen.

gcloud

Verwenden Sie zum Veröffentlichen einer Nachricht mit einem Reihenfolgenschlüssel den Befehl gcloud pubsub topics publish und das Flag --ordering-key:

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  --ordering-key=ORDERING_KEY

Dabei gilt:

  • TOPIC_ID: ID des Themas
  • MESSAGE_DATA: ein String mit den Nachrichtendaten
  • ORDERING_KEY ist ein String mit einem Sortierungsschlüssel

REST

Senden Sie eine POST-Anfrage wie die folgende, um eine Nachricht mit einem Reihenfolgeschlüssel zu veröffentlichen:

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

Dabei gilt:

  • PROJECT_ID: Projekt-ID des Projekts mit dem Thema
  • TOPIC_ID: ID des Themas

Geben Sie im Anfragetext die folgenden Felder an:

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
      "ordering_key": "ORDERING_KEY",
    }
  ]
}

Dabei gilt:

  • KEY: der Schlüssel eines Nachrichtenattributs
  • VALUE: der Wert für den Schlüssel des Nachrichtenattributs
  • MESSAGE_DATA ist ein base64-codierter String mit den Nachrichtendaten
  • ORDERING_KEY ist ein String mit einem Sortierungsschlüssel

Die Nachricht muss entweder ein nicht leeres Datenfeld oder mindestens ein Attribut enthalten.

Wenn die Anfrage erfolgreich ist, ist die Antwort ein JSON-Objekt mit der Nachrichten-ID. Das folgende Beispiel ist eine Antwort mit einer Nachrichten-ID:

{
  "messageIds": [
    "19916711285",
  ]
}

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto message_id =
        publisher.Publish(pubsub::MessageBuilder{}
                              .SetData("Hello World! [" + datum.data + "]")
                              .SetOrderingKey(datum.ordering_key)
                              .Build());
    std::string ack_id = datum.ordering_key + "#" + datum.data;
    done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << ack_id << " published with id=" << *id
                << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishOrderedMessagesAsyncSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            // Sending messages to the same region ensures they are received in order even when multiple publishers are used.
            Endpoint = "us-east1-pubsub.googleapis.com:443",
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

func publishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Sending messages to the same region ensures they are received in order
	// even when multiple publishers are used.
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)
	t.EnableMessageOrdering = true

	messages := []struct {
		message     string
		orderingKey string
	}{
		{
			message:     "message1",
			orderingKey: "key1",
		},
		{
			message:     "message2",
			orderingKey: "key2",
		},
		{
			message:     "message3",
			orderingKey: "key1",
		},
		{
			message:     "message4",
			orderingKey: "key2",
		},
	}

	for _, m := range messages {
		res := t.Publish(ctx, &pubsub.Message{
			Data:        []byte(m.message),
			OrderingKey: m.orderingKey,
		})

		wg.Add(1)
		go func(res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Printf("Failed to publish: %s\n", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(res)
	}

	wg.Wait()

	if totalErrors > 0 {
		fmt.Fprintf(w, "%d of 4 messages did not publish successfully", totalErrors)
		return
	}

	fmt.Fprint(w, "Published 4 messages with ordering keys successfully\n")
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class PublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    publishWithOrderingKeysExample(projectId, topicId);
  }

  public static void publishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            // Sending messages to the same region ensures they are received in order
            // even when multiple publishers are used.
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .setEnableMessageOrdering(true)
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      // When finished with the publisher, shutdown to free up resources.
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub({
  // Sending messages to the same region ensures they are received in order
  // even when multiple publishers are used.
  apiEndpoint: 'us-east1-pubsub.googleapis.com:443',
});

async function publishOrderedMessage(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Be sure to set an ordering key that matches other messages
  // you want to receive in order, relative to each other.
  const message = {
    data: dataBuffer,
    orderingKey: orderingKey,
  };

  const publishOptions = {
    messageOrdering: true,
  };

  // Publishes the message
  const messageId = await pubSubClient
    .topic(topicNameOrId, publishOptions)
    .publishMessage(message);

  console.log(`Message ${messageId} published.`);

  return messageId;
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    print(future.result())

print(f"Published messages with ordering keys to {topic_path}.")

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

# topic_id = "your-topic-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
  topic.publish_async "This is message \##{i}.",
                      ordering_key: "ordering-key"
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
puts "Messages published with ordering key."

Wenn die Veröffentlichung mit einem Sortierungsschlüssel fehlschlägt, schlagen Nachrichten in der Warteschlange desselben Sortierschlüssels im Publisher fehl. Dies gilt auch für zukünftige Veröffentlichungsanfragen dieses Sortierschlüssels. Sie müssen die Veröffentlichung mit Sortierschlüsseln fortsetzen, wenn solche Fehler auftreten. Ein Beispiel für die Fortsetzung des Veröffentlichungsvorgangs finden Sie unter Anfragen mit Bestellschlüsseln wiederholen.

Schema verwenden

Sie können Nachrichten zu einem Thema veröffentlichen, das einem Schema zugeordnet ist. Weitere Informationen finden Sie unter Schemas erstellen und verwalten. Sie müssen die Nachrichten in dem Schema und Format codieren, das Sie beim Erstellen des Themas angegeben haben.

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Avro
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto constexpr kNewYork =
      R"js({ "name": "New York", "post_abbr": "NY" })js";
  auto constexpr kPennsylvania =
      R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
  };
  for (auto const* data : {kNewYork, kPennsylvania}) {
    done.push_back(
        publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
            .then(handler));
  }
  // Block until all messages are published.
  for (auto& d : done) d.get();
}
Proto
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<std::pair<std::string, std::string>> states{
      {"New York", "NY"},
      {"Pennsylvania", "PA"},
  };
  std::vector<future<void>> done;
  auto handler = [](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
  };
  for (auto& data : states) {
    google::cloud::pubsub::samples::State state;
    state.set_name(data.first);
    state.set_post_abbr(data.second);
    done.push_back(publisher
                       .Publish(pubsub::MessageBuilder{}
                                    .SetData(state.SerializeAsString())
                                    .Build())
                       .then(handler));
  }
  // Block until all messages are published.
  for (auto& d : done) d.get();
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C# API.

Avro

using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishAvroMessagesAsyncSample
{
    public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>
        {

            try
            {
                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                {
                    case Encoding.Binary:
                        using (var ms = new MemoryStream())
                        {
                            var encoder = new BinaryEncoder(ms);
                            var writer = new SpecificDefaultWriter(state.Schema);
                            writer.Write(state, encoder);
                            messageId = await publisher.PublishAsync(ms.ToArray());
                        }
                        break;
                    case Encoding.Json:
                        var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                        break;
                }
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error ocurred when publishing message {state}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}
Proto

using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishProtoMessagesAsyncSample
{
    public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
        var topic = publishApi.GetTopic(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageStates.Select(async state =>
        {
            try
            {
                string messageId = null;
                switch (topic.SchemaSettings.Encoding)
                {
                    case Encoding.Binary:
                        var binaryMessage = state.ToByteString();
                        messageId = await publisher.PublishAsync(binaryMessage);
                        break;
                    case Encoding.Json:
                        var jsonMessage = JsonFormatter.Default.Format(state);
                        messageId = await publisher.PublishAsync(jsonMessage);
                        break;
                }
                Console.WriteLine($"Published message {messageId}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error ocurred when publishing message {state}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Avro
import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	avroSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("ioutil.ReadFile err: %v", err)
	}
	codec, err := goavro.NewCodec(string(avroSource))
	if err != nil {
		return fmt.Errorf("goavro.NewCodec err: %v", err)
	}
	record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}

	// Get the topic encoding type.
	t := client.Topic(topicID)
	cfg, err := t.Config(ctx)
	if err != nil {
		return fmt.Errorf("topic.Config err: %v", err)
	}
	encoding := cfg.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsub.EncodingBinary:
		msg, err = codec.BinaryFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.BinaryFromNative err: %v", err)
		}
	case pubsub.EncodingJSON:
		msg, err = codec.TextualFromNative(nil, record)
		if err != nil {
			return fmt.Errorf("codec.TextualFromNative err: %v", err)
		}
	default:
		return fmt.Errorf("invalid encoding: %v", encoding)
	}

	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	})
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %v", err)
	}
	fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
	return nil
}
Proto
import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
)

func publishProtoMessages(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	state := &statepb.State{
		Name:     "Alaska",
		PostAbbr: "AK",
	}

	// Get the topic encoding type.
	t := client.Topic(topicID)
	cfg, err := t.Config(ctx)
	if err != nil {
		return fmt.Errorf("topic.Config err: %v", err)
	}
	encoding := cfg.SchemaSettings.Encoding

	var msg []byte
	switch encoding {
	case pubsub.EncodingBinary:
		msg, err = proto.Marshal(state)
		if err != nil {
			return fmt.Errorf("proto.Marshal err: %v", err)
		}
	case pubsub.EncodingJSON:
		msg, err = protojson.Marshal(state)
		if err != nil {
			return fmt.Errorf("protojson.Marshal err: %v", err)
		}
	default:
		return fmt.Errorf("invalid encoding: %v", encoding)
	}

	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	})
	_, err = result.Get(ctx)
	if err != nil {
		return fmt.Errorf("result.Get: %v", err)
	}
	fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Avro

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;

public class PublishAvroRecordsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with an Avro schema.
    String topicId = "your-topic-id";

    publishAvroRecordsExample(projectId, topicId);
  }

  public static void publishAvroRecordsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
    }

    // Instantiate an avro-tools-generated class defined in `us-states.avsc`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    Publisher publisher = null;

    block:
    try {
      publisher = Publisher.newBuilder(topicName).build();

      // Prepare to serialize the object to the output stream.
      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();

      Encoder encoder = null;

      // Prepare an appropriate encoder for publishing to the topic.
      switch (encoding) {
        case BINARY:
          System.out.println("Preparing a BINARY encoder...");
          encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null);
          break;

        case JSON:
          System.out.println("Preparing a JSON encoder...");
          encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
          break;

        default:
          break block;
      }

      // Encode the object and write it to the output stream.
      state.customEncode(encoder);
      encoder.flush();

      // Publish the encoded object as a Pub/Sub message.
      ByteString data = ByteString.copyFrom(byteStream.toByteArray());
      PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
      System.out.println("Publishing message: " + message);

      ApiFuture<String> future = publisher.publish(message);
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}
Proto

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;

public class PublishProtobufMessagesExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use a topic created with a proto schema.
    String topicId = "your-topic-id";

    publishProtobufMessagesExample(projectId, topicId);
  }

  public static void publishProtobufMessagesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {

    Encoding encoding = null;

    TopicName topicName = TopicName.of(projectId, topicId);

    // Get the topic encoding type.
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
    }

    Publisher publisher = null;

    // Instantiate a protoc-generated class defined in `us-states.proto`.
    State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();

    block:
    try {
      publisher = Publisher.newBuilder(topicName).build();

      PubsubMessage.Builder message = PubsubMessage.newBuilder();

      // Prepare an appropriately formatted message based on topic encoding.
      switch (encoding) {
        case BINARY:
          message.setData(state.toByteString());
          System.out.println("Publishing a BINARY-formatted message:\n" + message);
          break;

        case JSON:
          String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
          message.setData(ByteString.copyFromUtf8(jsonString));
          System.out.println("Publishing a JSON-formatted message:\n" + message);
          break;

        default:
          break block;
      }

      // Publish the message.
      ApiFuture<String> future = publisher.publish(message.build());
      System.out.println("Published message ID: " + future.get());

    } finally {
      if (publisher != null) {
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Avro
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishAvroRecords(topicNameOrId) {
  // Get the topic metadata to learn about its schema encoding.
  const topic = pubSubClient.topic(topicNameOrId);
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Make an encoder using the official avro-js library.
  const definition = fs
    .readFileSync('system-test/fixtures/provinces.avsc')
    .toString();
  const type = avro.parse(definition);

  // Encode the message.
  const province = {
    name: 'Ontario',
    post_abbr: 'ON',
  };
  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = type.toBuffer(province);
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(type.toString(province));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }

  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publish(dataBuffer);
  console.log(`Avro record ${messageId} published.`);
}
Protocol Buffer
/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');

// And the protobufjs library
const protobuf = require('protobufjs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishProtobufMessages(topicNameOrId) {
  // Get the topic metadata to learn about its schema.
  const topic = pubSubClient.topic(topicNameOrId);
  const [topicMetadata] = await topic.getMetadata();
  const topicSchemaMetadata = topicMetadata.schemaSettings;

  if (!topicSchemaMetadata) {
    console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
    return;
  }
  const schemaEncoding = topicSchemaMetadata.encoding;

  // Encode the message.
  const province = {
    name: 'Ontario',
    postAbbr: 'ON',
  };

  // Make an encoder using the protobufjs library.
  //
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = await protobuf.load('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');
  const message = Province.create(province);

  let dataBuffer;
  switch (schemaEncoding) {
    case Encodings.Binary:
      dataBuffer = Buffer.from(Province.encode(message).finish());
      break;
    case Encodings.Json:
      dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
      break;
    default:
      console.log(`Unknown schema encoding: ${schemaEncoding}`);
      break;
  }

  if (!dataBuffer) {
    console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
    return;
  }

  const messageId = await topic.publish(dataBuffer);
  console.log(`Protobuf message ${messageId} published.`);
}

PHP

Folgen Sie der Einrichtungsanleitung für PHP unter Schnellstart: Clientbibliotheken verwenden, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub PHP API.

Avro
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroDataIOWriter;

/**
 * Publish a message using an AVRO schema.
 *
 * This sample uses `wikimedia/avro` for AVRO encoding.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $definitionFile
 * @return void
 */
function publish_avro_records($projectId, $topicId, $definitionFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = file_get_contents($definitionFile);

    $messageData = [
        'name' => 'Alaska',
        'post_abbr' => 'AK',
    ];

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];
    }

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);
        return;
    }

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);
    }

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as AVRO binary.
        $io = new AvroStringIO();
        $schema = AvroSchema::parse($definition);
        $writer = new AvroIODatumWriter($schema);
        $dataWriter = new AvroDataIOWriter($io, $writer, $schema);

        $dataWriter->append($messageData);

        $dataWriter->close();

        // AVRO binary data must be base64-encoded.
        $encodedMessageData = base64_encode($io->string());
    } else {
        // encode as JSON.
        $encodedMessageData = json_encode($messageData);
    }

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);
}
Protocol Buffer
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;

use Utilities\StateProto;

/**
 * Publish a message using a protocol buffer schema.
 *
 * Relies on a proto message of the following form:
 * ```
 * syntax = "proto3";
 *
 * package utilities;
 *
 * message StateProto {
 *   string name = 1;
 *   string post_abbr = 2;
 * }
 * ```
 *
 * @param string $projectId
 * @param string $topicId
 * @return void
 */
function publish_proto_messages($projectId, $topicId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $messageData = new StateProto([
        'name' => 'Alaska',
        'post_abbr' => 'AK',
    ]);

    $topic = $pubsub->topic($topicId);

    // get the encoding type.
    $topicInfo = $topic->info();
    $encoding = '';
    if (isset($topicInfo['schemaSettings']['encoding'])) {
        $encoding = $topicInfo['schemaSettings']['encoding'];
    }

    // if encoding is not set, we can't continue.
    if ($encoding === '') {
        printf('Topic %s does not have schema enabled', $topicId);
        return;
    }

    // If you are using gRPC, encoding may be an integer corresponding to an
    // enum value on Google\Cloud\PubSub\V1\Encoding.
    if (!is_string($encoding)) {
        $encoding = Encoding::name($encoding);
    }

    $encodedMessageData = '';
    if ($encoding == 'BINARY') {
        // encode as protobuf binary.
        $encodedMessageData = $messageData->serializeToString();
    } else {
        // encode as JSON.
        $encodedMessageData = $messageData->serializeToJsonString();
    }

    $topic->publish(['data' => $encodedMessageData]);

    printf('Published message with %s encoding', $encoding);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Avro
from avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

# Prepare to write Avro records to the binary output stream.
avro_schema = schema.parse(open(avsc_file, "rb").read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()

# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        encoder = BinaryEncoder(bout)
        writer.write(record, encoder)
        data = bout.getvalue()
        print(f"Preparing a binary-encoded message:\n{data.decode()}")
    elif encoding == Encoding.JSON:
        data_str = json.dumps(record)
        print(f"Preparing a JSON-encoded message:\n{data_str}")
        data = data_str.encode("utf-8")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")
Protocol Buffer
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding

from utilities import us_states_pb2  # type: ignore

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Instantiate a protoc-generated class defined in `us-states.proto`.
    state = us_states_pb2.StateProto()
    state.name = "Alaska"
    state.post_abbr = "AK"

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        data = state.SerializeToString()
        print(f"Preparing a binary-encoded message:\n{data}")
    elif encoding == Encoding.JSON:
        json_object = MessageToJson(state)
        data = str(json_object).encode("utf-8")
        print(f"Preparing a JSON-encoded message:\n{data}")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

Avro
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

record = { "name" => "Alaska", "post_abbr" => "AK" }

if topic.message_encoding_binary?
  require "avro"
  avro_schema = Avro::Schema.parse File.read(avsc_file)
  writer = Avro::IO::DatumWriter.new avro_schema
  buffer = StringIO.new
  encoder = Avro::IO::BinaryEncoder.new buffer
  writer.write record, encoder
  topic.publish buffer
  puts "Published binary-encoded AVRO message."
elsif topic.message_encoding_json?
  require "json"
  topic.publish record.to_json
  puts "Published JSON-encoded AVRO message."
else
  raise "No encoding specified in #{topic.name}."
end
Protocol Buffer
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
require_relative "utilities/us-states_pb"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"

if topic.message_encoding_binary?
  topic.publish Utilities::StateProto.encode(state)
  puts "Published binary-encoded protobuf message."
elsif topic.message_encoding_json?
  topic.publish Utilities::StateProto.encode_json(state)
  puts "Published JSON-encoded protobuf message."
else
  raise "No encoding specified in #{topic.name}."
end

Batchnachrichten in einer Veröffentlichungsanfrage

Sie können die Pub/Sub-Clientbibliothek für Ihren Publisher verwenden, um Nachrichten zu einem Thema zu veröffentlichen. Die Clientbibliothek verwendet die Batchfunktion, um mehrere Nachrichten zusammen in einem Dienstaufruf zu veröffentlichen. Durch die Stapelung oder Gruppierung von Nachrichten kann Pub/Sub einen höheren Durchsatz an Nachrichten erreichen. Sie können die Größe des Batches an Ihre geschäftlichen Anforderungen anpassen.

Batchnachrichten sind in einer Clientbibliothek standardmäßig aktiviert. Batch-Nachrichten erstellen Latenz für einzelne Nachrichten. Einzelne Nachrichten müssen so lange im Arbeitsspeicher abgelegt werden, bis der entsprechende Batch gefüllt ist. Erst dann werden die Nachrichten zum Thema veröffentlicht.

Wenn die Kosten unberücksichtigt sind, können Sie mehrere Publisher-Clients erstellen und Batchnachrichten deaktivieren. Dieser Prozess minimiert die Latenz und maximiert den Durchsatz durch horizontale Skalierung auf die Anzahl der Publisher. Kosten werden jedoch oft berücksichtigt. Das Senden mehrerer Nachrichten in einer einzelnen Veröffentlichungsanfrage ist eine Möglichkeit, mit weniger Publishern einen vergleichbaren Durchsatz zu erreichen. Wenn Sie bereit sind, gegen Latenz zu sparen, um Kosten zu sparen, insbesondere wenn Ihre Anwendung innerhalb kurzer Zeit relativ viele Nachrichten verarbeitet, können Sie Batchnachrichten verwenden.

Mit der Batch-Nachrichtenfunktion können Sie die Batchgröße in Byte oder der Anzahl von Nachrichten sowie den Zeitpunkt konfigurieren, nach dem der Batch veröffentlicht wird. Mit kleinen Batches von Nachrichten während der Spitzenveröffentlichung können Sie die Latenz steuern.

Batch-Messaging in einer Clientbibliothek konfigurieren

Sie können Nachrichten basierend auf der Größe der Nachrichtenanfrage, der Anzahl der Nachrichten und der Zeit im Batch zusammenfassen. Die Standardwerte für die Batch-Messaging-Variablen und die Namen der Variablen können je nach Clientbibliothek variieren. In der Python-Clientbibliothek steuern beispielsweise die folgenden Variablen Batchnachrichten:

Variable Beschreibung Wert
Max. Nachrichten Die Anzahl der Nachrichten in einem Batch. Standard=100
Max. Byte Die maximale Größe eines Batches in MB. Standard=1 MB
maximale Latenzzeit Die Zeit, nach der ein Batch veröffentlicht wird, auch wenn der Batch nicht gefüllt ist. Standard=10 ms

Sie können einen oder alle drei Werte in der Clientbibliothek angeben. Wenn einer der Werte für die Batchnachrichtenvariablen erreicht ist, veröffentlicht die Clientbibliothek den nächsten Nachrichten-Batch.

In den folgenden Codebeispielen erfahren Sie, wie Sie die Batch-Nachrichteneinstellungen für Ihren Publisher konfigurieren.

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // By default, the publisher will flush a batch after 10ms, after it
  // contains more than 100 message, or after it contains more than 1MiB of
  // data, whichever comes first. This changes those defaults.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))
          .set<pubsub::MaxBatchBytesOption>(4 * 1024 * 1024L)
          .set<pubsub::MaxBatchMessagesOption>(200)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C# API.


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishBatchedMessagesAsyncSample
{
    public async Task<int> PublishBatchMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        // Default Settings:
        // byteCountThreshold: 1000000
        // elementCountThreshold: 100
        // delayThreshold: 10 milliseconds
        var customSettings = new PublisherClient.Settings
        {
            BatchingSettings = new BatchingSettings(
                elementCountThreshold: 50,
                byteCountThreshold: 10240,
                delayThreshold: TimeSpan.FromMilliseconds(500))
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"time"

	"cloud.google.com/go/pubsub"
)

func publishWithSettings(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()
	var results []*pubsub.PublishResult
	var resultErrors []error
	t := client.Topic(topicID)
	t.PublishSettings.ByteThreshold = 5000
	t.PublishSettings.CountThreshold = 10
	t.PublishSettings.DelayThreshold = 100 * time.Millisecond

	for i := 0; i < 10; i++ {
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})
		results = append(results, result)
	}
	// The Get method blocks until a server-generated ID or
	// an error is returned for the published message.
	for i, res := range results {
		id, err := res.Get(ctx)
		if err != nil {
			resultErrors = append(resultErrors, err)
			fmt.Fprintf(w, "Failed to publish: %v", err)
			continue
		}
		fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
	}
	if len(resultErrors) != 0 {
		return fmt.Errorf("Get: %v", resultErrors[len(resultErrors)-1])
	}
	fmt.Fprintf(w, "Published messages with batch settings.")
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithBatchSettingsExample(projectId, topicId);
  }

  public static void publishWithBatchSettingsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 1000 bytes
      long messageCountBatchSize = 100L; // default : 100 message

      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setElementCountThreshold(messageCountBatchSize)
              .setRequestByteThreshold(requestBytesThreshold)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishBatchedMessages(
  topicNameOrId,
  data,
  maxMessages,
  maxWaitTime
) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  const publishOptions = {
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime * 1000,
    },
  };
  const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);

  const promises = [];
  for (let i = 0; i < 10; i++) {
    promises.push(
      (async () => {
        const messageId = await batchPublisher.publishMessage({
          data: dataBuffer,
        });
        console.log(`Message ${messageId} published.`);
      })()
    );
  }
  await Promise.all(promises);
}

PHP

Folgen Sie der Einrichtungsanleitung für PHP unter Schnellstart: Clientbibliotheken verwenden, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub PHP API.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * The publisher should be used in conjunction with the `google-cloud-batch`
 * daemon, which should be running in the background.
 *
 * To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message    The message to publish.
 */
function publish_message_batch($projectId, $topicName, $message)
{
    // Check if the batch daemon is running.
    if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
        trigger_error(
            'The batch daemon is not running. Call ' .
            '`vendor/bin/google-cloud-batch daemon` from ' .
            'your project root to start the daemon.',
            E_USER_NOTICE
        );
    }

    $batchOptions = [
        'batchSize' => 100, // Max messages for each batch.
        'callPeriod' => 0.01, // Max time in seconds between each batch publish.
    ];

    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $publisher = $topic->batchPublisher([
        'batchOptions' => $batchOptions
    ]);

    for ($i = 0; $i < 10; $i++) {
        $publisher->publish(['data' => $message]);
    }

    print('Messages enqueued for publication.' . PHP_EOL);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_messages=10,  # default 100
    max_bytes=1024,  # default 1 MB
    max_latency=1,  # default 10 ms
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
    message_id = future.result()
    print(message_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    publish_future = publisher.publish(topic_path, data)
    # Non-blocking. Allow the publisher client to batch multiple messages.
    publish_future.add_done_callback(callback)
    publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with batch settings to {topic_path}.")

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

# topic_id = "your-topic-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
10.times do |i|
  topic.publish_async "This is message \##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

Batch-Nachrichten deaktivieren

Wenn Sie die Batchverarbeitung in der Clientbibliothek deaktivieren möchten, setzen Sie den Wert von max_messages auf 1.

Batch-Nachrichten und geordnete Zustellung

Wenn bei der geordneten Zustellung eine Nachricht im Batch nicht bestätigt wird, werden alle Nachrichten im Batch, einschließlich der Nachrichten, die vor der Nachricht gesendet wurden, die nicht bestätigt wurden, noch einmal zugestellt.

Kontingente und Limits für Batchnachrichten

Bevor Sie Batch-Nachrichten konfigurieren, sollten Sie Faktoren wie das Durchsatzkontingent für die Veröffentlichung und die maximale Größe eines Batches berücksichtigen. Die übergeordneten Clientbibliotheken sorgen dafür, dass Batchanfragen innerhalb der angegebenen Limits bleiben.

  • 1.000 Byte ist die minimale Anfragegröße, die für Kostenzwecke berücksichtigt wird, auch wenn die tatsächliche Nachrichtengröße möglicherweise kleiner als 1.000 Byte ist.
  • Pub/Sub hat ein Limit von 10 MB oder 1.000 Nachrichten für eine einzelne Batchveröffentlichungsanfrage.

Weitere Informationen finden Sie unter Kontingente und Limits für Pub/Sub.

Nachrichten komprimieren

Wenn Sie Pub/Sub verwenden, um Nachrichten zu veröffentlichen, die viele Daten enthalten, können Sie Ihre Daten mit gRPC komprimieren, um Netzwerkkosten zu sparen, bevor Ihr Publisher-Client die Veröffentlichungsanfrage sendet. Die Pub/Sub-Komprimierung für gRPC verwendet den Gzip-Algorithmus.

Das Komprimierungsverhältnis für die Verwendung der clientseitigen gRPC-Komprimierungsfunktion variiert je nach Publisher-Client und ist von den folgenden Faktoren abhängig:

  • Datenmenge. Das Komprimierungsverhältnis verbessert sich, wenn die Größe der Nutzlast von einigen hundert Byte auf viele Kilobyte an Daten zunimmt. Die Batcheinstellungen einer Veröffentlichungsanfrage bestimmen die Datenmenge, die in der jeweiligen Veröffentlichungsanfrage enthalten ist. Wir empfehlen, die Batcheinstellungen in Verbindung mit der gRPC-Komprimierung zu aktivieren.

  • Datentyp: Textbasierte Daten wie JSON oder XML sind im Vergleich zu Binärdaten wie Bildern komprimierbarer.

Wenn der Publisher-Client Google Cloud verwendet, können Sie mit dem Messwert Gesendete Byte (instance/network/sent_bytes_count) den Veröffentlichungsdurchsatz in Byte messen. Wenn Ihr Publisher-Client eine andere Anwendung verwendet, müssen Sie die kundenspezifischen Tools für die Messung verwenden.

Das Codebeispiel in diesem Abschnitt zeigt ein Codebeispiel für die Java-Clientbibliothek, das auch die gRPC-Komprimierung enthält.

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace g = ::google::cloud;
namespace pubsub = ::google::cloud::pubsub;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      g::Options{}
          // Compress any batch of messages over 10 bytes. By default, no
          // messages are compressed, set this to 0 to compress all batches,
          // regardless of their size.
          .set<pubsub::CompressionThresholdOption>(10)
          // Compress using the GZIP algorithm. By default, the library uses
          // GRPC_COMPRESS_DEFLATE.
          .set<pubsub::CompressionAlgorithmOption>(GRPC_COMPRESS_GZIP)));
  auto message_id = publisher.Publish(
      pubsub::MessageBuilder{}.SetData("Hello World!").Build());
  auto done = message_id.then([](g::future<g::StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
    std::cout << "Hello World! published with id=" << *id << "\n";
  });
  // Block until the message is published
  done.get();
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithGrpcCompressionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    publishWithGrpcCompressionExample(projectId, topicId);
  }

  public static void publishWithGrpcCompressionExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    // Create a publisher and set enable compression to true.
    Publisher publisher = null;
    try {
      // Enable compression and configure the compression threshold to 10 bytes (default to 240 B).
      // Publish requests of sizes > 10 B (excluding the request headers) will get compressed.
      // The number of messages in a publish request is determined by publisher batch settings.
      // Batching is turned off by default, i.e. each publish request contains only one message.
      publisher =
          Publisher.newBuilder(topicName)
              .setEnableCompression(true)
              .setCompressionBytesThreshold(10L)
              .build();

      byte[] bytes = new byte[1024];
      ByteString data = ByteString.copyFrom(bytes);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic).
      // You can look up the actual size of the outbound data using the Java Logging API.
      // Configure logging properties as shown in
      // https://github.com/googleapis/java-pubsub/tree/main/samples/snippets/src/main/resources/logging.properties
      // and look for "OUTBOUND DATA" with "length=" in the output log.
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a compressed message of message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Anfragen wiederholen

Fehlgeschlagene Veröffentlichungen werden automatisch wiederholt, außer bei Fehlern, die keine Wiederholungen rechtfertigen. Mit diesem Beispielcode wird dargestellt, wie ein Publisher mit benutzerdefinierten Wiederholungseinstellungen erstellt wird. Beachten Sie, dass nicht alle Clientbibliotheken benutzerdefinierte Wiederholungseinstellungen unterstützen. Dazu wird auf die API-Referenzdokumentation für die Sprache Ihrer Wahl verwiesen:

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // By default a publisher will retry for 60 seconds, with an initial backoff
  // of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
  // 30% after each attempt. This changes those defaults.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::RetryPolicyOption>(
              pubsub::LimitedTimeRetryPolicy(
                  /*maximum_duration=*/std::chrono::minutes(10))
                  .clone())
          .set<pubsub::BackoffPolicyOption>(
              pubsub::ExponentialBackoffPolicy(
                  /*initial_delay=*/std::chrono::milliseconds(200),
                  /*maximum_delay=*/std::chrono::seconds(45),
                  /*scaling=*/2.0)
                  .clone())));

  std::vector<future<bool>> done;
  for (char const* data : {"1", "2", "3", "go!"}) {
    done.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([](future<StatusOr<std::string>> f) {
              return f.get().ok();
            }));
  }
  publisher.Flush();
  int count = 0;
  for (auto& f : done) {
    if (f.get()) ++count;
  }
  std::cout << count << " messages sent successfully\n";
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C# API.


using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;

public class PublishMessageWithRetrySettingsAsyncSample
{
    public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // Retry settings control how the publisher handles retry-able failures
        var maxAttempts = 3;
        var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
        var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
        var backoffMultiplier = 1.3; // default: 1.0
        var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds

        var publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            ApiSettings = new PublisherServiceApiSettings
            {
                PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
                               maxAttempts: maxAttempts,
                               initialBackoff: initialBackoff,
                               maxBackoff: maxBackoff,
                               backoffMultiplier: backoffMultiplier,
                               retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
                       .WithTimeout(totalTimeout)
            }
        }.BuildAsync();
        string message = await publisher.PublishAsync(messageText);
        Console.WriteLine($"Published message {message}");
    }
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

public class PublishWithRetrySettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithRetrySettingsExample(projectId, topicId);
  }

  public static void publishWithRetrySettingsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Retry settings control how the publisher handles retry-able failures
      Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
      double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
      Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
      Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
      double rpcTimeoutMultiplier = 1.0; // default: 1.0
      Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
      Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds

      RetrySettings retrySettings =
          RetrySettings.newBuilder()
              .setInitialRetryDelay(initialRetryDelay)
              .setRetryDelayMultiplier(retryDelayMultiplier)
              .setMaxRetryDelay(maxRetryDelay)
              .setInitialRpcTimeout(initialRpcTimeout)
              .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
              .setMaxRpcTimeout(maxRpcTimeout)
              .setTotalTimeout(totalTimeout)
              .build();

      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with retry settings: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a publisher client.
const publisherClient = new v1.PublisherClient({
  // optional auth parameters
});
async function publishWithRetrySettings(projectId, topicNameOrId, data) {
  const formattedTopic = publisherClient.projectTopicPath(
    projectId,
    topicNameOrId
  );

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const messagesElement = {
    data: dataBuffer,
  };
  const messages = [messagesElement];

  // Build the request
  const request = {
    topic: formattedTopic,
    messages: messages,
  };

  // Retry settings control how the publisher handles retryable failures. Default values are shown.
  // The `retryCodes` array determines which grpc errors will trigger an automatic retry.
  // The `backoffSettings` object lets you specify the behaviour of retries over time.
  const retrySettings = {
    retryCodes: [
      10, // 'ABORTED'
      1, // 'CANCELLED',
      4, // 'DEADLINE_EXCEEDED'
      13, // 'INTERNAL'
      8, // 'RESOURCE_EXHAUSTED'
      14, // 'UNAVAILABLE'
      2, // 'UNKNOWN'
    ],
    backoffSettings: {
      // The initial delay time, in milliseconds, between the completion
      // of the first failed request and the initiation of the first retrying request.
      initialRetryDelayMillis: 100,
      // The multiplier by which to increase the delay time between the completion
      // of failed requests, and the initiation of the subsequent retrying request.
      retryDelayMultiplier: 1.3,
      // The maximum delay time, in milliseconds, between requests.
      // When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
      maxRetryDelayMillis: 60000,
      // The initial timeout parameter to the request.
      initialRpcTimeoutMillis: 5000,
      // The multiplier by which to increase the timeout parameter between failed requests.
      rpcTimeoutMultiplier: 1.0,
      // The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
      // rpcTimeoutMultiplier will no longer be used to increase the timeout.
      maxRpcTimeoutMillis: 600000,
      // The total time, in milliseconds, starting from when the initial request is sent,
      // after which an error will be returned, regardless of the retrying attempts made meanwhile.
      totalTimeoutMillis: 600000,
    },
  };

  const [response] = await publisherClient.publish(request, {
    retry: retrySettings,
  });
  console.log(`Message ${response.messageIds} published.`);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

from google import api_core
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
    initial=0.250,  # seconds (default: 0.1)
    maximum=90.0,  # seconds (default: 60.0)
    multiplier=1.45,  # default: 1.3
    deadline=300.0,  # seconds (default: 60.0)
    predicate=api_core.retry.if_exception_type(
        api_core.exceptions.Aborted,
        api_core.exceptions.DeadlineExceeded,
        api_core.exceptions.InternalServerError,
        api_core.exceptions.ResourceExhausted,
        api_core.exceptions.ServiceUnavailable,
        api_core.exceptions.Unknown,
        api_core.exceptions.Cancelled,
    ),
)

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
    print(future.result())

print(f"Published messages with retry settings to {topic_path}.")

Wiederholungseinstellungen steuern, wie die Pub/Sub-Clientbibliotheken Veröffentlichungsanfragen wiederholen. Die Clientbibliotheken haben eine der folgenden Wiederholungseinstellungen:

  • Zeitüberschreitung der ersten Anfrage: Die Zeitspanne, nach der eine Clientbibliothek nicht mehr auf die Fertigstellung der ersten Veröffentlichungsanfrage wartet.
  • Wiederholungsverzögerung: Die Zeitspanne, die eine Clientbibliothek nach der Zeitüberschreitung einer Anfrage wartet, bevor sie die Anfrage wiederholt.
  • Gesamtzeitüberschreitung: Die Zeitspanne, nach der eine Clientbibliothek keine weiteren Veröffentlichungsanfragen mehr wiederholt.

Damit Veröffentlichungsanfragen wiederholt werden können, muss die Zeitüberschreitung der ersten Anfrage kürzer sein als die Gesamtzeitüberschreitung. Wenn Sie beispielsweise exponentiellen Backoff verwenden, berechnen die Clientbibliotheken das Anfragezeitlimit und die Wiederholungsverzögerung folgendermaßen:

  • Nach jeder Veröffentlichungsanfrage erhöht sich das Anfragezeitlimit um den Zeitüberschreitungsfaktor der Anfrage bis zum maximalen Anfragezeitlimit.
  • Nach jedem Wiederholungsversuch erhöht sich die Wiederholungsverzögerung um den Wiederholungsverzögerungsfaktor bis zur maximalen Wiederholungsverzögerung.

Anfragen mit Bestellschlüsseln noch einmal senden

Wenn eine Clientbibliothek eine Anfrage wiederholt und die Nachricht einen Reihenfolgeschlüssel hat, wiederholt die Clientbibliothek die Anfrage unabhängig von den Wiederholungseinstellungen wiederholt.

Wenn ein nicht wiederholbarer Fehler auftritt, veröffentlicht die Clientbibliothek die Nachricht nicht und veröffentlicht keine weiteren Nachrichten mit demselben Reihenfolgeschlüssel. Wenn ein Publisher beispielsweise eine Nachricht an ein nicht vorhandenes Thema sendet, tritt ein nicht wiederholbarer Fehler auf. Wenn Sie weitere Nachrichten mit demselben Reihenfolgenschlüssel veröffentlichen möchten, rufen Sie eine Methode auf, um die Veröffentlichung fortzusetzen und dann wieder zu veröffentlichen.

Im folgenden Beispiel wird gezeigt, wie Nachrichten mit demselben Reihenfolgeschlüssel wieder veröffentlicht werden.

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto const& da = datum;  // workaround MSVC lambda capture confusion
    auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
      auto const msg = da.ordering_key + "#" + da.data;
      auto id = f.get();
      if (!id) {
        std::cout << "An error has occurred publishing " << msg << "\n";
        publisher.ResumePublish(da.ordering_key);
        return;
      }
      std::cout << "Message " << msg << " published as id=" << *id << "\n";
    };
    done.push_back(
        publisher
            .Publish(pubsub::MessageBuilder{}
                         .SetData("Hello World! [" + datum.data + "]")
                         .SetOrderingKey(datum.ordering_key)
                         .Build())
            .then(handler));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class ResumePublishSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
                publisher.ResumePublish(keyAndMessage.Item1);
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Sending messages to the same region ensures they are received in order
	// even when multiple publishers are used.
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.EnableMessageOrdering = true
	key := "some-ordering-key"

	res := t.Publish(ctx, &pubsub.Message{
		Data:        []byte("some-message"),
		OrderingKey: key,
	})
	_, err = res.Get(ctx)
	if err != nil {
		// Error handling code can be added here.
		fmt.Printf("Failed to publish: %s\n", err)

		// Resume publish on an ordering key that has had unrecoverable errors.
		// After such an error publishes with this ordering key will fail
		// until this method is called.
		t.ResumePublish(key)
	}

	fmt.Fprint(w, "Published a message with ordering key successfully\n")
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class ResumePublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    resumePublishWithOrderingKeysExample(projectId, topicId);
  }

  public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            .setEnableMessageOrdering(true)
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
                // (Beta) Must call resumePublish to reset key and continue publishing with order.
                publisher.resumePublish(pubsubMessage.getOrderingKey());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function resumePublish(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  const publishOptions = {
    messageOrdering: true,
  };

  // Publishes the message
  const publisher = pubSubClient.topic(topicNameOrId, publishOptions);
  try {
    const message = {
      data: dataBuffer,
      orderingKey: orderingKey,
    };
    const messageId = await publisher.publishMessage(message);
    console.log(`Message ${messageId} published.`);

    return messageId;
  } catch (e) {
    console.log(`Could not publish: ${e}`);
    publisher.resumePublishing(orderingKey);
    return null;
  }
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    try:
        print(future.result())
    except RuntimeError:
        # Resume publish on an ordering key that has had unrecoverable errors.
        publisher.resume_publish(topic_path, ordering_key)

print(f"Resumed publishing messages with ordering keys to {topic_path}.")

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

# topic_id = "your-topic-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
  topic.publish_async "This is message \##{i}.",
                      ordering_key: "ordering-key" do |result|
    if result.succeeded?
      puts "Message \##{i} successfully published."
    else
      puts "Message \##{i} failed to publish"
      # Allow publishing to continue on "ordering-key" after processing the
      # failure.
      topic.resume_publish "ordering-key"
    end
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!

Ablaufsteuerung

Ein Publisher-Client kann versuchen, Nachrichten schneller zu veröffentlichen, als dieser Client Daten an den Pub/Sub-Dienst senden kann. Clients sind durch viele Faktoren begrenzt, darunter:

  • Maschinen-CPU, RAM und Netzwerkkapazität
  • Netzwerkeinstellungen, z. B. Anzahl der ausstehenden Anfragen und verfügbare Bandbreite
  • Die Latenz jeder Veröffentlichungsanfrage, die hauptsächlich durch die Netzwerkverbindungen zwischen dem Pub/Sub-Dienst, dem Client und Google Cloud bestimmt wird

Wenn die Rate von Veröffentlichungsanfragen diese Limits überschreitet, werden Anfragen im Arbeitsspeicher angesammelt, bis sie mit dem Fehler DEADLINE_EXCEEDED fehlschlagen. Dies ist besonders wahrscheinlich, wenn Zehntausende von Nachrichten in einer Schleife veröffentlicht werden und dadurch Tausende von Anfragen in Millisekunden generiert werden.

Sie können dieses Problem mithilfe der serverseitigen Messwerte in Monitoring diagnostizieren. Sie können die Anfragen, die mit DEADLINE_EXCEEDED fehlgeschlagen sind, nicht sehen, sondern nur die erfolgreichen Anfragen. Die Rate erfolgreicher Anfragen gibt die Durchsatzkapazität Ihrer Clientmaschinen an und bietet eine Grundlage für die Konfiguration der Ablaufsteuerung.

Zur Seite „Monitoring“

Wenn Sie die Probleme mit der Ablaufrate minimieren möchten, konfigurieren Sie Ihren Publisher-Client mit Ablaufsteuerung, um die Rate der Veröffentlichungsanfragen zu begrenzen. Sie können die maximale Anzahl der Bytes konfigurieren, die für ausstehende Anfragen zugewiesen sind, und die maximale Anzahl zulässiger ausstehender Nachrichten. Legen Sie diese Limits entsprechend der Durchsatzkapazität Ihrer Clientmaschinen fest.

Die Publisher-Ablaufsteuerung ist über die Pub/Sub-Clientbibliotheken in den folgenden Sprachen verfügbar:

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Configure the publisher to block if either (1) 100 or more messages, or
  // (2) messages with 100MiB worth of data have not been acknowledged by the
  // service. By default the publisher never blocks, and its capacity is only
  // limited by the system's memory.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::MaxPendingMessagesOption>(100)
          .set<pubsub::MaxPendingBytesOption>(100 * 1024 * 1024L)
          .set<pubsub::FullPublisherActionOption>(
              pubsub::FullPublisherAction::kBlocks)));

  std::vector<future<void>> ids;
  for (char const* data : {"a", "b", "c"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{
		MaxOutstandingMessages: 100,                     // default 1000
		MaxOutstandingBytes:    10 * 1024 * 1024,        // default 0 (unlimited)
		LimitExceededBehavior:  pubsub.FlowControlBlock, // default Ignore, other options: Block and SignalError
	}

	var wg sync.WaitGroup
	var totalErrors uint64

	numMsgs := 1000
	// Rapidly publishing 1000 messages in a loop may be constrained by flow control.
	for i := 0; i < numMsgs; i++ {
		wg.Add(1)
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("message #" + strconv.Itoa(i)),
		})
		go func(i int, res *pubsub.PublishResult) {
			fmt.Fprintf(w, "Publishing message %d\n", i)
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, numMsgs)
	}
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithFlowControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithFlowControlExample(projectId, topicId);
  }

  public static void publishWithFlowControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Configure how many messages the publisher client can hold in memory
      // and what to do when messages exceed the limit.
      FlowControlSettings flowControlSettings =
          FlowControlSettings.newBuilder()
              // Block more messages from being published when the limit is reached. The other
              // options are Ignore (or continue publishing) and ThrowException (or error out).
              .setLimitExceededBehavior(LimitExceededBehavior.Block)
              .setMaxOutstandingRequestBytes(10 * 1024 * 1024L) // 10 MiB
              .setMaxOutstandingElementCount(100L) // 100 messages
              .build();

      // By default, messages are not batched.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder().setFlowControlSettings(flowControlSettings).build();

      publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

      // Publish 1000 messages in quick succession may be constrained by publisher flow control.
      for (int i = 0; i < 1000; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println(
          "Published " + messageIds.size() + " messages with flow control settings.");

      if (publisher != null) {
        // When finished with the publisher, shut down to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishWithFlowControl(topicNameOrId) {
  // Create publisher options
  const options = {
    flowControlOptions: {
      maxOutstandingMessages: 50,
      maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
    },
  };

  // Get a publisher.
  const topic = pubSubClient.topic(topicNameOrId, options);

  // For flow controlled publishing, we'll use a publisher flow controller
  // instead of `topic.publish()`.
  const flow = topic.flowControlled();

  // Publish messages in a fast loop.
  const testMessage = {data: Buffer.from('test!')};
  for (let i = 0; i < 1000; i++) {
    // You can also just `await` on `publish()` unconditionally, but if
    // you want to avoid pausing to the event loop on each iteration,
    // you can manually check the return value before doing so.
    const wait = flow.publish(testMessage);
    if (wait) {
      await wait;
    }
  }

  // Wait on any pending publish requests. Note that you can call `all()`
  // earlier if you like, and it will return a Promise for all messages
  // that have been sent to `flowController.publish()` so far.
  const messageIds = await flow.all();
  console.log(`Published ${messageIds.length} with flow control settings.`);
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

from concurrent import futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import (
    LimitExceededBehavior,
    PublisherOptions,
    PublishFlowControl,
)

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control_settings = PublishFlowControl(
    message_limit=100,  # 100 messages
    byte_limit=10 * 1024 * 1024,  # 10 MiB
    limit_exceeded_behavior=LimitExceededBehavior.BLOCK,
)
publisher = pubsub_v1.PublisherClient(
    publisher_options=PublisherOptions(flow_control=flow_control_settings)
)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
    message_id = publish_future.result()
    print(message_id)

# Publish 1000 messages in quick succession may be constrained by
# publisher flow control.
for n in range(1, 1000):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    publish_future = publisher.publish(topic_path, data)
    # Non-blocking. Allow the publisher client to batch messages.
    publish_future.add_done_callback(callback)
    publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with flow control settings to {topic_path}.")

Gleichzeitigkeitserkennung

Die Unterstützung für Gleichzeitigkeit ist programmiersprachenabhängig. Weitere Informationen finden Sie in der API-Referenzdokumentation.

Im folgenden Beispiel wird dargestellt, wie die Gleichzeitigkeitserkennung in einer Publisher-Anwendung funktioniert:

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.NumGoroutines = 1

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

# topic_id = "your-topic-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Publisher überwachen

Cloud Monitoring bietet eine Reihe von Messwerten zur Überwachung von Themen.

Informationen zum Monitoring eines Themas und zur Pflege eines fehlerfreien Publishers finden Sie unter Stabilen Publisher verwalten.

Nächste Schritte