Memublikasikan pesan ke topik

Dokumen ini memberikan informasi tentang memublikasikan pesan.

Aplikasi penayang membuat dan mengirim pesan ke topik. Pub/Sub menawarkan pengiriman pesan minimal satu kali dan pengurutan dengan upaya terbaik kepada pelanggan yang ada.

Alur umum untuk aplikasi penayang adalah:

  1. Buat pesan yang berisi data Anda.
  2. Kirim permintaan ke server Pub/Sub untuk memublikasikan pesan ke topik yang ditentukan.

Sebelum memulai

Sebelum mengonfigurasi alur kerja publikasi, pastikan Anda telah menyelesaikan tugas-tugas berikut:

Peran yang diperlukan

Untuk mendapatkan izin yang diperlukan guna memublikasikan pesan ke topik, minta administrator untuk memberi Anda peran IAM Pub/Sub Publisher (roles/pubsub.publisher) pada topik. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Anda memerlukan izin tambahan untuk membuat atau memperbarui topik dan langganan.

Format pesan

Pesan terdiri dari kolom dengan data dan metadata pesan. Tentukan setidaknya salah satu dari hal berikut dalam pesan:

Layanan Pub/Sub menambahkan kolom berikut ke pesan:

  • ID pesan yang unik untuk topik
  • Stempel waktu saat layanan Pub/Sub menerima pesan

Untuk mempelajari pesan lebih lanjut, lihat Format pesan.

Memublikasikan pesan

Anda dapat memublikasikan pesan dengan konsol Google Cloud, Google Cloud CLI, Pub/Sub API, dan library klien. Library klien dapat memublikasikan pesan secara asinkron.

Contoh berikut menunjukkan cara memublikasikan pesan ke topik.

Konsol

Untuk memublikasikan pesan, ikuti langkah-langkah berikut:

  1. Di konsol Google Cloud, buka halaman Pub/Sub topics.

    Buka halaman topik Pub/Sub

  2. Klik ID topik.

  3. Di halaman Topic details pada bagian Messages, klik Publish message.

  4. Di kolom Isi pesan, masukkan data pesan.

  5. Klik Publikasikan.

gcloud

Untuk memublikasikan pesan, gunakan perintah gcloud pubsub topics publish:

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  [--attribute=KEY="VALUE",...]

Ganti kode berikut:

  • TOPIC_ID: ID topik
  • MESSAGE_DATA: string dengan data pesan
  • KEY: kunci atribut pesan
  • VALUE: nilai untuk kunci atribut pesan

REST

Untuk memublikasikan pesan, kirim permintaan POST seperti berikut:

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

Ganti kode berikut:

  • PROJECT_ID: project ID project dengan topik
  • TOPIC_ID: ID topik

Tentukan kolom berikut dalam isi permintaan:

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
    }
  ]
}

Ganti kode berikut:

  • KEY: kunci atribut pesan
  • VALUE: nilai untuk kunci atribut pesan
  • MESSAGE_DATA: string berenkode base64 dengan data pesan

Pesan harus berisi kolom data yang tidak kosong atau setidaknya satu atribut.

Jika permintaan berhasil, responsnya adalah objek JSON dengan ID pesan. Contoh berikut adalah respons dengan ID pesan:

{
  "messageIds": [
    "19916711285",
  ]
}

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto message_id = publisher.Publish(
      pubsub::MessageBuilder{}.SetData("Hello World!").Build());
  auto done = message_id.then([](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
    std::cout << "Hello World! published with id=" << *id << "\n";
  });
  // Block until the message is published
  done.get();
}

C#

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishMessagesAsyncSample
{
    public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)

	for i := 0; i < n; i++ {
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})

		wg.Add(1)
		go func(i int, res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			id, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
	}
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Java API Pub/Sub.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PublishWithErrorHandlerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithErrorHandlerExample(projectId, topicId);
  }

  public static void publishWithErrorHandlerExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      List<String> messages = Arrays.asList("first message", "second message");

      for (final String message : messages) {
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle success / failure
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // details on the API exception
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + message);
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic)
                System.out.println("Published message ID: " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  try {
    const messageId = topic.publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(`Received error while publishing: ${error.message}`);
    process.exitCode = 1;
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId: string, data: string) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  try {
    const messageId = topic.publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(
      `Received error while publishing: ${(error as Error).message}`
    );
    process.exitCode = 1;
  }
}

PHP

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan PHP di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API PHP Pub/Sub.

use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $topic->publish((new MessageBuilder)->setData($message)->build());

    print('Message published' . PHP_EOL);
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Python API.

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

for i in range(10):
    data = str(i)
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

Ruby

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

begin
  topic.publish_async "This is a test message." do |result|
    raise "Failed to publish the message." unless result.succeeded?
    puts "Message published asynchronously."
  end

  # Stop the async_publisher to send all queued messages immediately.
  topic.async_publisher.stop.wait!
rescue StandardError => e
  puts "Received error while publishing: #{e.message}"
end

Setelah Anda memublikasikan pesan, layanan Pub/Sub akan menampilkan ID pesan kepada penayang.

Menggunakan atribut untuk memublikasikan pesan

Anda dapat menyematkan atribut kustom sebagai metadata dalam pesan Pub/Sub. Atribut digunakan untuk memberikan informasi tambahan tentang pesan, seperti prioritas, asal, atau tujuannya. Atribut juga dapat digunakan untuk memfilter pesan di langganan.

Ikuti panduan berikut untuk menggunakan atribut dalam pesan Anda:

  • Atribut dapat berupa string teks atau string byte.

  • Anda dapat memiliki maksimal 100 atribut per pesan.

  • Kunci atribut tidak boleh diawali dengan goog dan tidak boleh melebihi 256 byte.

  • Nilai atribut tidak boleh melebihi 1.024 byte.

Skema pesan dapat direpresentasikan sebagai berikut:

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string,
  "orderingKey": string
}

Untuk duplikat sisi publikasi, Anda mungkin melihat nilai publishTime yang berbeda untuk pesan asli sisi klien yang sama, bahkan dengan messageId yang sama.

Skema JSON PubsubMessage dipublikasikan sebagai bagian dari dokumentasi REST dan RPC. Anda dapat menggunakan atribut kustom untuk stempel waktu peristiwa.

Contoh berikut menunjukkan cara memublikasikan pesan dengan atribut ke topik.

Konsol

Untuk memublikasikan pesan dengan atribut, ikuti langkah-langkah berikut:

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka halaman topik Pub/Sub

  2. Klik topik yang ingin Anda publikasikan pesannya.

  3. Di halaman detail topik, klik Messages.

  4. Klik Publikasikan pesan.

  5. Di kolom Isi pesan, masukkan data pesan.

  6. Di bagian Atribut pesan, klik Tambahkan atribut.

  7. Masukkan pasangan nilai kunci.

  8. Tambahkan atribut lainnya, jika diperlukan.

  9. Klik Publikasikan.

gcloud

gcloud pubsub topics publish my-topic --message="hello" \
  --attribute="origin=gcloud-sample,username=gcp,eventTime='2021-01-01T12:00:00Z'"

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<future<void>> done;
  for (int i = 0; i != 10; ++i) {
    auto message_id = publisher.Publish(
        pubsub::MessageBuilder{}
            .SetData("Hello World! [" + std::to_string(i) + "]")
            .SetAttribute("origin", "cpp-sample")
            .SetAttribute("username", "gcp")
            .Build());
    done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << i << " published with id=" << *id << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Threading.Tasks;

public class PublishMessageWithCustomAttributesAsyncSample
{
    public async Task PublishMessageWithCustomAttributesAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        var pubsubMessage = new PubsubMessage
        {
            // The data is any arbitrary ByteString. Here, we're using text.
            Data = ByteString.CopyFromUtf8(messageText),
            // The attributes provide metadata in a string-to-string dictionary.
            Attributes =
            {
                { "year", "2020" },
                { "author", "unknown" }
            }
        };
        string message = await publisher.PublishAsync(pubsubMessage);
        Console.WriteLine($"Published message {message}");
    }
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Hello world!"),
		Attributes: map[string]string{
			"origin":   "golang",
			"username": "gcp",
		},
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Java API Pub/Sub.


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithCustomAttributesExample(projectId, topicId);
  }

  public static void publishWithCustomAttributesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder()
              .setData(data)
              .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
              .build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with custom attributes: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessageWithCustomAttributes(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Add two custom attributes, origin and username, to the message
  const customAttributes = {
    origin: 'nodejs-sample',
    username: 'gcp',
  };

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  const messageId = topic.publishMessage({
    data: dataBuffer,
    attributes: customAttributes,
  });
  console.log(`Message ${messageId} published.`);
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # Add two attributes, origin and username, to the message
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp"
    )
    print(future.result())

print(f"Published messages with custom attributes to {topic_path}.")

Ruby

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Menggunakan kunci pengurutan untuk memublikasikan pesan

Untuk menerima pesan secara berurutan di klien pelanggan, Anda harus mengonfigurasi klien penayang untuk memublikasikan pesan dengan kunci pengurutan.

Untuk memahami konsep pengurutan kunci, lihat Pesan urutan.

Berikut adalah daftar pertimbangan utama untuk pesan yang diurutkan bagi klien penayang:

  • Pengurutan di satu klien penayang: Saat satu klien penayang memublikasikan pesan dengan kunci pengurutan yang sama di wilayah yang sama, klien pelanggan akan menerima pesan tersebut dalam urutan yang sama persis dengan urutan publikasinya. Misalnya, jika klien penayang memublikasikan pesan 1, 2, dan 3 dengan kunci pengurutan A, klien pelanggan akan menerimanya dalam urutan 1, 2, 3.

  • Pengurutan di beberapa klien penayang: Urutan pesan yang diterima oleh klien pelanggan konsisten dengan urutan saat pesan tersebut dipublikasikan di wilayah yang sama, meskipun beberapa klien penayang menggunakan kunci pengurutan yang sama. Namun, klien penayang itu sendiri tidak mengetahui pesanan ini.

    Misalnya, jika klien penayang X dan Y masing-masing memublikasikan pesan dengan kunci pengurutan A, dan pesan X diterima oleh Pub/Sub sebelum pesan Y, semua klien pelanggan akan menerima pesan X sebelum pesan Y. Jika urutan pesan yang ketat di berbagai klien penayang diperlukan, klien tersebut harus menerapkan mekanisme koordinasi tambahan untuk memastikan mereka tidak memublikasikan pesan dengan kunci pengurutan yang sama secara bersamaan. Misalnya, layanan penguncian dapat digunakan untuk mempertahankan kepemilikan kunci pengurutan saat memublikasikan.

  • Pengurutan di seluruh region: Jaminan pengiriman yang dipesan hanya berlaku saat publikasi untuk kunci pengurutan berada di region yang sama. Jika aplikasi penayang Anda memublikasikan pesan dengan kunci pengurutan yang sama ke wilayah yang berbeda, urutan tidak dapat diterapkan di seluruh publikasi tersebut. Pelanggan dapat terhubung ke region mana pun dan jaminan pengurutan tetap dipertahankan.

    Saat Anda menjalankan aplikasi dalam Google Cloud, secara default aplikasi tersebut akan terhubung ke endpoint Pub/Sub di region yang sama. Oleh karena itu, menjalankan aplikasi di satu region dalam Google Cloud umumnya memastikan Anda berinteraksi dengan satu region.

    Saat menjalankan aplikasi penayang di luar Google Cloud atau di beberapa region, Anda dapat memastikan bahwa Anda terhubung ke satu region menggunakan endpoint lokasi saat mengonfigurasi klien Pub/Sub. Semua endpoint lokasi untuk Pub/Sub mengarah ke satu region. Untuk mempelajari endpoint lokasi lebih lanjut, lihat Endpoint Pub/Sub. Untuk mengetahui daftar semua endpoint lokasi untuk Pub/Sub, lihat Daftar endpoint lokasi.

  • Kegagalan publikasi: Jika publikasi dengan kunci pengurutan gagal, pesan yang diantrekan dari kunci pengurutan yang sama di penayang akan gagal, termasuk permintaan publikasi mendatang dari kunci pengurutan ini. Anda harus melanjutkan publikasi dengan kunci pengurutan saat kegagalan tersebut terjadi. Untuk mengetahui contoh melanjutkan operasi publikasi, lihat Mencoba ulang permintaan dengan kunci pengurutan.

Anda dapat memublikasikan pesan dengan kunci pengurutan menggunakan konsol Google Cloud, Google Cloud CLI, Pub/Sub API, atau library klien.

Konsol

Untuk memublikasikan pesan dengan atribut, ikuti langkah-langkah berikut:

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka halaman topik Pub/Sub

  2. Klik topik yang ingin Anda publikasikan pesannya.

  3. Di halaman detail topik, klik Messages.

  4. Klik Publikasikan pesan.

  5. Di kolom Isi pesan, masukkan data pesan.

  6. Di kolom Pengurutan pesan, masukkan kunci pengurutan.

  7. Klik Publikasikan.

gcloud

Untuk memublikasikan pesan dengan kunci pengurutan, gunakan perintah gcloud pubsub topics publish dan flag --ordering-key:

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  --ordering-key=ORDERING_KEY

Ganti kode berikut:

  • TOPIC_ID: ID topik
  • MESSAGE_DATA: string dengan data pesan
  • ORDERING_KEY: string dengan kunci pengurutan

REST

Untuk memublikasikan pesan dengan kunci pengurutan, kirim permintaan POST seperti berikut:

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

Ganti kode berikut:

  • PROJECT_ID: project ID project dengan topik
  • TOPIC_ID: ID topik

Tentukan kolom berikut dalam isi permintaan:

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
      "ordering_key": "ORDERING_KEY",
    }
  ]
}

Ganti kode berikut:

  • KEY: kunci atribut pesan
  • VALUE: nilai untuk kunci atribut pesan
  • MESSAGE_DATA: string berenkode base64 dengan data pesan
  • ORDERING_KEY: string dengan kunci pengurutan

Pesan harus berisi kolom data yang tidak kosong atau setidaknya satu atribut.

Jika permintaan berhasil, responsnya adalah objek JSON dengan ID pesan. Contoh berikut adalah respons dengan ID pesan:

{
  "messageIds": [
    "19916711285",
  ]
}

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto message_id =
        publisher.Publish(pubsub::MessageBuilder{}
                              .SetData("Hello World! [" + datum.data + "]")
                              .SetOrderingKey(datum.ordering_key)
                              .Build());
    std::string ack_id = datum.ordering_key + "#" + datum.data;
    done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << ack_id << " published with id=" << *id
                << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishOrderedMessagesAsyncSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            // Sending messages to the same region ensures they are received in order even when multiple publishers are used.
            Endpoint = "us-east1-pubsub.googleapis.com:443",
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

func publishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region.
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)
	t.EnableMessageOrdering = true

	messages := []struct {
		message     string
		orderingKey string
	}{
		{
			message:     "message1",
			orderingKey: "key1",
		},
		{
			message:     "message2",
			orderingKey: "key2",
		},
		{
			message:     "message3",
			orderingKey: "key1",
		},
		{
			message:     "message4",
			orderingKey: "key2",
		},
	}

	for _, m := range messages {
		res := t.Publish(ctx, &pubsub.Message{
			Data:        []byte(m.message),
			OrderingKey: m.orderingKey,
		})

		wg.Add(1)
		go func(res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Printf("Failed to publish: %s\n", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(res)
	}

	wg.Wait()

	if totalErrors > 0 {
		fmt.Fprintf(w, "%d of 4 messages did not publish successfully", totalErrors)
		return
	}

	fmt.Fprint(w, "Published 4 messages with ordering keys successfully\n")
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Java API Pub/Sub.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class PublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    publishWithOrderingKeysExample(projectId, topicId);
  }

  public static void publishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            // Sending messages to the same region ensures they are received in order
            // even when multiple publishers are used.
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .setEnableMessageOrdering(true)
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      // When finished with the publisher, shutdown to free up resources.
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub({
  // Sending messages to the same region ensures they are received in order
  // even when multiple publishers are used.
  apiEndpoint: 'us-east1-pubsub.googleapis.com:443',
});

async function publishOrderedMessage(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Be sure to set an ordering key that matches other messages
  // you want to receive in order, relative to each other.
  const message = {
    data: dataBuffer,
    orderingKey: orderingKey,
  };

  // Cache topic objects (publishers) and reuse them.
  //
  // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
  // key are in the same region. For list of locational endpoints for Pub/Sub, see:
  // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
  const publishOptions = {
    messageOrdering: true,
  };
  const topic = pubSubClient.topic(topicNameOrId, publishOptions);

  // Publishes the message
  const messageId = topic.publishMessage(message);

  console.log(`Message ${messageId} published.`);

  return messageId;
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Python API.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    print(future.result())

print(f"Published messages with ordering keys to {topic_path}.")

Ruby

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new endpoint: "us-east1-pubsub.googleapis.com:443"

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
  topic.publish_async "This is message ##{i}.",
                      ordering_key: "ordering-key"
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
puts "Messages published with ordering key."

Memantau penayang

Cloud Monitoring menyediakan sejumlah metrik untuk memantau topik.

Untuk memantau topik dan mempertahankan penayang yang sehat, lihat Mempertahankan penayang yang sehat.

Langkah selanjutnya