Nachrichten veröffentlichen

In diesem Dokument erfahren Sie mehr zum Veröffentlichen von Nachrichten. Weitere Informationen zum Erstellen, Löschen und Verwalten von Themen und Abos finden Sie unter Themen und Abos verwalten. Informationen zum Empfangen von Nachrichten finden Sie im Abonnentenleitfaden.

Eine Publisher-Anwendung erstellt und sendet Nachrichten an ein Thema. Cloud Pub/Sub bietet mindestens einmalige Nachrichtenzustellung und die optimale Reihenfolge für vorhandene Abonnenten, wie in der Abonnentenübersicht erläutert.

Der allgemeine Ablauf für eine Publisher-Anwendung ist:

  1. Erstellen Sie eine Nachricht mit den entsprechenden Daten.
  2. Senden Sie eine Anfrage an den Cloud Pub/Sub-Server, um die Nachricht für das gewünschte Thema zu veröffentlichen.

Einrichtung

In Clientbibliotheken – Erste Schritte wird beschrieben, wie Sie die Umgebung in der Programmiersprache Ihrer Wahl einrichten.

Nachrichten zu einem Thema veröffentlichen

Bei der Verwendung von JSON über REST müssen die Nachrichtendaten base64-codiert sein. Die gesamte Anfrage, einschließlich einer oder mehrerer Nachrichten, muss nach der Decodierung kleiner als 10 MB sein. Beachten Sie, dass die Nachrichtennutzlast nicht leer sein darf; sie muss entweder ein nicht leeres Datenfeld oder mindestens ein Attribut enthalten.

Clientbibliotheken können Nachrichten synchron oder asynchron veröffentlichen, je nach der von Ihnen gewählten Programmiersprache. Die asynchrone Veröffentlichung ermöglicht die Stapelverarbeitung und einen höheren Durchsatz in Ihrer Anwendung.

Alle Clientbibliotheken unterstützen die asynchrone Veröffentlichung von Nachrichten. In der API-Referenzdokumentation können Sie für die von Ihnen gewählte Programmiersprache nachsehen, ob deren Clientbibliothek auch die synchrone Veröffentlichung von Nachrichten unterstützt, falls Sie diese Option vorziehen.

Bei der Veröffentlichung einer Nachricht wird eine vom Server generierte (und innerhalb des Themas eindeutige) ID zurückgegeben.

Protokoll

Anfrage:

POST     https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}
Antwort
200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

Befehlszeile

gcloud pubsub topics publish my-topic --message "hello"

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId));

go

var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topic)

for i := 0; i < n; i++ {
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Message " + strconv.Itoa(i)),
	})

	wg.Add(1)
	go func(i int, res *pubsub.PublishResult) {
		defer wg.Done()
		// The Get method blocks until a server-generated ID or
		// an error is returned for the published message.
		id, err := res.Get(ctx)
		if err != nil {
			// Error handling code can be added here.
			log.Output(1, fmt.Sprintf("Failed to publish: %v", err))
			atomic.AddUint64(&totalErrors, 1)
			return
		}
		fmt.Printf("Published message %d; msg ID: %v\n", i, id)
	}(i, result)
}

wg.Wait()

if totalErrors > 0 {
	return errors.New(
		fmt.Sprintf("%d of %d messages did not publish successfully",
			totalErrors, n))
}
return nil

java

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
Publisher publisher = null;

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.newBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  for (final String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Add an asynchronous callback to handle success / failure
    ApiFutures.addCallback(
        future,
        new ApiFutureCallback<String>() {

          @Override
          public void onFailure(Throwable throwable) {
            if (throwable instanceof ApiException) {
              ApiException apiException = ((ApiException) throwable);
              // details on the API exception
              System.out.println(apiException.getStatusCode().getCode());
              System.out.println(apiException.isRetryable());
            }
            System.out.println("Error publishing message : " + message);
          }

          @Override
          public void onSuccess(String messageId) {
            // Once published, returns server-assigned message ids (unique within the topic)
            System.out.println(messageId);
          }
        },
        MoreExecutors.directExecutor());
  }
} finally {
  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
  }
}

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);

php

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

python

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a Future.
    message_future = publisher.publish(topic_path, data=data)
    message_future.add_done_callback(callback)

print('Published message IDs:')

# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
    time.sleep(60)

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Neue benutzerdefinierte Attribute

Sie können benutzerdefinierte Attribute als Metadaten in Pub/Sub-Nachrichten einbetten. Attribute können Text- oder Bytestrings sein. Das Nachrichtenschema kann so dargestellt werden:

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string
}

Das JSON-Schema PubsubMessage wird als Teil der REST-Dokumentation und der RPC-Dokumentation veröffentlicht.

go

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte("Hello world!"),
	Attributes: map[string]string{
		"origin":   "golang",
		"username": "gcp",
	},
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published message with custom attributes; msg ID: %v\n", id)

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
  origin: 'nodejs-sample',
  username: 'gcp',
};

const messageId = await pubsub
  .topic(topicName)
  .publish(dataBuffer, customAttributes);
console.log(`Message ${messageId} published.`);

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # Add two attributes, origin and username, to the message
    publisher.publish(
        topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Latenz und Durchsatz mit Stapelverarbeitung ausgleichen

Die Cloud Pub/Sub-Clientbibliotheken fassen mehrere Nachrichten in einem Stapel für einen einzigen Aufruf des Diensts zusammen. Größere Stapel erhöhen den Nachrichtendurchsatz (Rate der pro CPU gesendeten Nachrichten). Die Kosten für die Stapelverarbeitung berechnen sich aus der Latenz für einzelne Nachrichten, die in der Warteschlange im Speicher stehen, bis der entsprechende Stapel gefüllt und bereit ist, um über das Netzwerk gesendet zu werden. Wenn Sie die Latenz minimieren möchten, sollte die Stapelverarbeitung deaktiviert werden. Besonders wichtig ist dies für Anwendungen, die eine einzelne Nachricht als Teil einer Anfrage/Antwort-Sequenz veröffentlichen. Ein typisches Beispiel für dieses Muster findet sich in serverlosen, ereignisgesteuerten Anwendungen, die Cloud Functions oder App Engine verwenden.

Nachrichten können anhand der Anfragegröße (in Byte), der Anzahl von Nachrichten und des Zeitpunktes in Stapeln zusammengefasst werden. Sie können die Standardeinstellungen wie in diesem Beispiel dargestellt überschreiben.

c#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId),
    settings: new PublisherClient.Settings
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
{
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");
}

go

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	ByteThreshold:  5000,
	CountThreshold: 10,
	DelayThreshold: 100 * time.Millisecond,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

java

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1 byte
long messageCountBatchSize = 10L; // default : 1 message

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings =
    BatchingSettings.newBuilder()
        .setElementCountThreshold(messageCountBatchSize)
        .setRequestByteThreshold(requestBytesThreshold)
        .setDelayThreshold(publishDelayThreshold)
        .build();

Publisher publisher =
    Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const [messageId] = await pubsub
  .topic(topicName, {
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })
  .publish(dataBuffer);
console.log(`Message ${messageId} published.`);

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024,  # One kilobyte
    max_latency=1,  # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    publisher.publish(topic_path, data=data)

print('Published messages.')

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish do |batch|
  10.times do |i|
    batch.publish "This is message \##{i}."
  end
end

puts "Messages published in batch."
# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
10.times do |i|
  topic.publish_async "This is message \##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

Anfragen wiederholen

Fehlgeschlagene Veröffentlichungen werden automatisch wiederholt, außer bei Fehlern, die keine Wiederholungen rechtfertigen. Mit diesem Beispielcode wird dargestellt, wie ein Publisher mit benutzerdefinierten Wiederholungseinstellungen erstellt wird. Beachten Sie, dass nicht alle Clientbibliotheken benutzerdefinierte Wiederholungseinstellungen unterstützen. Dazu wird auf die API-Referenzdokumentation für die Sprache Ihrer Wahl verwiesen:

java

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(5); // default: 5 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 2.0
Duration maxRetryDelay = Duration.ofSeconds(600); // default : Long.MAX_VALUE
Duration totalTimeout = Duration.ofSeconds(10); // default: 10 seconds
Duration initialRpcTimeout = Duration.ofSeconds(10); // default: 10 seconds
Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 10 seconds

RetrySettings retrySettings =
    RetrySettings.newBuilder()
        .setInitialRetryDelay(retryDelay)
        .setRetryDelayMultiplier(retryDelayMultiplier)
        .setMaxRetryDelay(maxRetryDelay)
        .setTotalTimeout(totalTimeout)
        .setInitialRpcTimeout(initialRpcTimeout)
        .setMaxRpcTimeout(maxRpcTimeout)
        .build();

Publisher publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

node.js

// Imports the Google Cloud client library
const {v1} = require('@google-cloud/pubsub');

// Creates a publisher client
const client = new v1.PublisherClient({
  // optional auth parameters
});

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectId = 'my-project-id'
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

const formattedTopic = client.topicPath(projectId, topicName);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
  data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
  topic: formattedTopic,
  messages: messages,
};

// Retry settings control how the publisher handles retryable failures
// Default values are shown
const retrySettings = {
  retryCodes: [
    10, // 'ABORTED'
    1, // 'CANCELLED',
    4, // 'DEADLINE_EXCEEDED'
    13, // 'INTERNAL'
    8, // 'RESOURCE_EXHAUSTED'
    14, // 'UNAVAILABLE'
    2, // 'UNKNOWN'
  ],
  backoffSettings: {
    initialRetryDelayMillis: 100,
    retryDelayMultiplier: 1.3,
    maxRetryDelayMillis: 60000,
    initialRpcTimeoutMillis: 12000,
    rpcTimeoutMultiplier: 1.0,
    maxRpcTimeoutMillis: 30000,
    totalTimeoutMillis: 600000,
  },
};

const [response] = await client.publish(request, {retry: retrySettings});
console.log(`Message ${response.messageIds} published.`);

Gleichzeitigkeitserkennung

Die Unterstützung für Gleichzeitigkeit ist programmiersprachenabhängig. Weitere Informationen finden Sie in der API-Referenzdokumentation.

Im folgenden Beispiel wird dargestellt, wie die Gleichzeitigkeitserkennung in einem Publisher funktioniert:

go

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	NumGoroutines: 1,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

java

// create a publisher with a single threaded executor
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
Publisher publisher =
    Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

# When you publish a message, the client returns a Future. This Future
# can be used to track when the message is published.
futures = []

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    message_future = publisher.publish(topic_path, data=data)
    futures.append(message_future)

print('Published message IDs:')
for future in futures:
    # result() blocks until the message is published.
    print(future.result())

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
Hat Ihnen diese Seite weitergeholfen? Teilen Sie uns Ihr Feedback mit:

Feedback geben zu...