주제에 메시지 게시

이 문서에서는 메시지 게시에 관한 정보를 제공합니다.

게시자 애플리케이션이 메시지를 만들어 주제로 전송합니다. Pub/Sub에서는 기존 구독자에 최소 1회 메시지 전송 및 최선의 순서 지정을 제공합니다.

게시자 애플리케이션의 일반적인 흐름은 다음과 같습니다.

  1. 사용자 데이터를 포함하는 메시지를 생성합니다.
  2. 지정된 주제에 메시지를 게시하려면 Pub/Sub 서버에 요청을 보냅니다.

시작하기 전에

게시 워크플로를 구성하기 전에 다음 작업이 완료되어야 합니다.

필요한 역할

주제에 메시지를 게시하는 데 필요한 권한을 얻으려면 관리자에게 주제에 대한 Pub/Sub 게시자(roles/pubsub.publisher) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

주제 및 구독을 만들거나 업데이트하려면 추가 권한이 필요합니다.

메시지 형식

메시지는 메시지 데이터 및 메타데이터가 있는 필드로 구성됩니다. 메시지에서 다음 중 하나 이상을 지정합니다.

Pub/Sub 서비스는 메시지에 다음 필드를 추가합니다.

  • 주제에 고유한 메시지 ID
  • Pub/Sub 서비스가 메시지를 수신하는 시점의 타임스탬프

메시지에 관한 자세한 내용은 메시지 형식을 참고하세요.

메시지 게시

Google Cloud 콘솔, Google Cloud CLI, Pub/Sub API, 클라이언트 라이브러리를 사용하여 메시지를 게시할 수 있습니다. 클라이언트 라이브러리는 메시지를 비동기식으로 게시할 수 있습니다.

다음 샘플은 주제에 메시지를 게시하는 방법을 보여줍니다.

콘솔

메시지를 게시하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 Pub/Sub 주제 페이지로 이동합니다.

    Pub/Sub 주제 페이지로 이동

  2. 주제 ID를 클릭합니다.

  3. 메시지 아래의 주제 세부정보 페이지에서 메시지 게시를 클릭합니다.

  4. 메시지 본문 필드에 메시지 데이터를 입력합니다.

  5. 게시를 클릭합니다.

gcloud

메시지를 게시하려면 gcloud pubsub topics publish 명령어를 사용합니다.

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  [--attribute=KEY="VALUE",...]

다음을 바꿉니다.

  • TOPIC_ID: 주제의 ID
  • MESSAGE_DATA: 메시지 데이터가 있는 문자열
  • KEY: 메시지 속성의 키
  • VALUE: 메시지 속성의 키 값

REST

메시지를 게시하려면 다음과 같이 POST 요청을 보내세요.

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

다음을 바꿉니다.

  • PROJECT_ID: 주제가 있는 프로젝트의 프로젝트 ID
  • TOPIC_ID: 주제의 ID

요청 본문에 다음 필드를 지정합니다.

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
    }
  ]
}

다음을 바꿉니다.

  • KEY: 메시지 속성의 키
  • VALUE: 메시지 속성의 키 값
  • MESSAGE_DATA: 메시지 데이터가 있는 base64로 인코딩된 문자열

메시지는 비어 있지 않은 데이터 필드 또는 하나 이상의 속성을 포함해야 합니다.

요청이 성공하면 응답은 메시지 ID가 있는 JSON 객체로 받습니다. 다음은 메시지 ID가 포함된 응답의 예시입니다.

{
  "messageIds": [
    "19916711285",
  ]
}

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto message_id = publisher.Publish(
      pubsub::MessageBuilder{}.SetData("Hello World!").Build());
  auto done = message_id.then([](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
    std::cout << "Hello World! published with id=" << *id << "\n";
  });
  // Block until the message is published
  done.get();
}

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishMessagesAsyncSample
{
    public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)

	for i := 0; i < n; i++ {
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})

		wg.Add(1)
		go func(i int, res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			id, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
	}
	return nil
}

Java

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PublishWithErrorHandlerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithErrorHandlerExample(projectId, topicId);
  }

  public static void publishWithErrorHandlerExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      List<String> messages = Arrays.asList("first message", "second message");

      for (final String message : messages) {
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle success / failure
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // details on the API exception
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + message);
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic)
                System.out.println("Published message ID: " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  try {
    const messageId = topic.publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(`Received error while publishing: ${error.message}`);
    process.exitCode = 1;
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId: string, data: string) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  try {
    const messageId = topic.publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(
      `Received error while publishing: ${(error as Error).message}`
    );
    process.exitCode = 1;
  }
}

PHP

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 PHP 설정 안내를 따르세요. 자세한 내용은 Pub/Sub PHP API 참고 문서를 참조하세요.

use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $topic->publish((new MessageBuilder)->setData($message)->build());

    print('Message published' . PHP_EOL);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

for i in range(10):
    data = str(i)
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

begin
  topic.publish_async "This is a test message." do |result|
    raise "Failed to publish the message." unless result.succeeded?
    puts "Message published asynchronously."
  end

  # Stop the async_publisher to send all queued messages immediately.
  topic.async_publisher.stop.wait!
rescue StandardError => e
  puts "Received error while publishing: #{e.message}"
end

메시지를 게시하면 Pub/Sub 서비스가 메시지 ID를 게시자에게 반환합니다.

속성을 사용하여 메시지 게시

Pub/Sub 메시지에 커스텀 속성을 메타데이터로 삽입할 수 있습니다. 속성은 우선순위, 원본, 대상과 같은 메시지에 대한 추가 정보를 제공하기 위해 사용됩니다. 속성은 또한 구독에서 메시지를 필터링하는 데 사용할 수 있습니다.

메시지에서 속성을 사용하려면 다음 가이드라인을 따르세요.

  • 속성은 텍스트 문자열이나 바이트 문자열이 지원됩니다.

  • 메시지당 최대 100개의 속성을 사용할 수 있습니다.

  • 속성 키는 goog로 시작하지 않아야 하고 256바이트를 초과하지 않아야 합니다.

  • 속성 값은 1,024바이트를 초과하지 않아야 합니다.

메시지 스키마는 다음과 같이 나타낼 수 있습니다.

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string,
  "orderingKey": string
}

게시 측 중복의 경우, 동일한 messageId라도 동일한 클라이언트 측 원본 메시지에 대해 다른 publishTime 값을 보게 될 수 있습니다.

PubsubMessage JSON 스키마는 RESTRPC 문서의 일부로 게시됩니다. 이벤트 타임스탬프에 커스텀 속성을 사용할 수 있습니다.

다음 샘플은 속성이 포함된 메시지를 주제에 게시하는 방법을 보여줍니다.

콘솔

속성이 포함된 메시지를 게시하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    Pub/Sub 주제 페이지로 이동

  2. 메시지를 게시할 주제를 클릭합니다.

  3. 주제 세부정보 페이지에서 메시지를 클릭합니다.

  4. 메시지 게시를 클릭합니다.

  5. 메시지 본문 필드에 메시지 데이터를 입력합니다.

  6. 메시지 속성에서 속성 추가를 클릭합니다.

  7. 키-값 쌍을 입력합니다.

  8. 필요한 경우 속성을 더 추가합니다.

  9. 게시를 클릭합니다.

gcloud

gcloud pubsub topics publish my-topic --message="hello" \
  --attribute="origin=gcloud-sample,username=gcp,eventTime='2021-01-01T12:00:00Z'"

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<future<void>> done;
  for (int i = 0; i != 10; ++i) {
    auto message_id = publisher.Publish(
        pubsub::MessageBuilder{}
            .SetData("Hello World! [" + std::to_string(i) + "]")
            .SetAttribute("origin", "cpp-sample")
            .SetAttribute("username", "gcp")
            .Build());
    done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << i << " published with id=" << *id << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Threading.Tasks;

public class PublishMessageWithCustomAttributesAsyncSample
{
    public async Task PublishMessageWithCustomAttributesAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        var pubsubMessage = new PubsubMessage
        {
            // The data is any arbitrary ByteString. Here, we're using text.
            Data = ByteString.CopyFromUtf8(messageText),
            // The attributes provide metadata in a string-to-string dictionary.
            Attributes =
            {
                { "year", "2020" },
                { "author", "unknown" }
            }
        };
        string message = await publisher.PublishAsync(pubsubMessage);
        Console.WriteLine($"Published message {message}");
    }
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Hello world!"),
		Attributes: map[string]string{
			"origin":   "golang",
			"username": "gcp",
		},
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
	return nil
}

Java

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithCustomAttributesExample(projectId, topicId);
  }

  public static void publishWithCustomAttributesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder()
              .setData(data)
              .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
              .build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with custom attributes: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessageWithCustomAttributes(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Add two custom attributes, origin and username, to the message
  const customAttributes = {
    origin: 'nodejs-sample',
    username: 'gcp',
  };

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  const messageId = topic.publishMessage({
    data: dataBuffer,
    attributes: customAttributes,
  });
  console.log(`Message ${messageId} published.`);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # Add two attributes, origin and username, to the message
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp"
    )
    print(future.result())

print(f"Published messages with custom attributes to {topic_path}.")

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

순서 키를 사용하여 메시지 게시

구독자 클라이언트에서 메시지를 순서대로 수신하려면 게시자 클라이언트가 순서 키로 메시지를 게시하도록 구성해야 합니다.

순서 키의 개념을 이해하려면 메시지 순서 지정을 참조하세요.

다음은 게시자 클라이언트의 순서가 지정된 메시징에 대한 주요 고려사항 목록입니다.

  • 단일 게시자 클라이언트에서 순서 지정: 단일 게시자 클라이언트가 동일한 리전에서 동일한 순서 키로 메시지를 게시하면 구독자 클라이언트는 게시된 정확한 순서대로 메시지를 수신합니다. 예를 들어 게시자 클라이언트가 순서 키 A로 메시지 1, 2, 3을 게시하면 구독자 클라이언트는 1, 2, 3 순으로 메시지를 수신합니다.

  • 여러 게시자 클라이언트 간 순서 지정: 여러 게시자 클라이언트가 동일한 순서 키를 사용하는 경우에도 구독자 클라이언트가 수신한 메시지의 순서는 동일한 리전에 게시된 순서와 일치합니다. 그러나 게시자 클라이언트 자체는 이 순서를 알지 못합니다.

    예를 들어 게시자 클라이언트 X와 Y가 각각 순서 키 A로 메시지를 게시하고 Pub/Sub에서 X의 메시지를 Y보다 먼저 수신하면 모든 구독자 클라이언트는 X의 메시지를 Y보다 먼저 수신합니다. 여러 게시자 클라이언트에 엄격한 메시지 순서가 필요한 경우 해당 클라이언트는 동일한 순서 키로 메시지를 동시에 게시하지 않도록 추가 조정 메커니즘을 구현해야 합니다. 예를 들어 잠금 서비스는 게시 중 순서 키의 소유권을 유지하는 데 사용될 수 있습니다.

  • 리전 간 순서 지정: 제공 순서 보장은 순서 키에 대한 게시가 동일한 리전에서 수행될 때만 적용됩니다. 게시자 애플리케이션이 동일한 순서 키를 사용해서 서로 다른 리전에 메시지를 게시하는 경우 이러한 게시 간에 순서를 적용할 수 없습니다. 구독자는 임의 리전에 연결할 수 있고 순서 보장이 계속 보존됩니다.

    Google Cloud 내에서 애플리케이션을 실행할 때는 기본적으로 동일 리전의 Pub/Sub 엔드포인트에 연결됩니다. 따라서 Google Cloud 내의 단일 리전에서 애플리케이션을 실행하면 일반적으로 단일 리전과 상호작용하도록 보장됩니다.

    Google Cloud 외부 또는 여러 리전에서 게시자 애플리케이션을 실행하는 경우 Pub/Sub 클라이언트를 구성할 때 위치별 엔드포인트를 사용하여 단일 리전에 연결하도록 보장할 수 있습니다. Pub/Sub의 모든 위치 엔드포인트는 단일 리전에 연결됩니다. 위치별 엔드포인트에 대한 자세한 내용은 Pub/Sub 엔드포인트를 참조하세요. Pub/Sub의 모든 위치별 엔드포인트 목록은 위치별 엔드포인트 목록을 참조하세요.

  • 게시 실패: 순서 키로 게시가 실패하면 이 순서 키의 이후 게시 요청을 포함하여 물론 게시자의 동일 순서 키에 대해 큐에 추가된 메시지가 실패합니다. 이러한 오류가 발생하면 순서 키로 게시를 재개해야 합니다. 게시 작업을 재개하는 예시는 순서 키를 사용하여 요청 재시도를 참조하세요.

Google Cloud 콘솔, Google Cloud CLI, Pub/Sub API, 클라이언트 라이브러리를 사용하여 순서 키가 포함된 메시지를 게시할 수 있습니다.

콘솔

속성이 포함된 메시지를 게시하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    Pub/Sub 주제 페이지로 이동

  2. 메시지를 게시할 주제를 클릭합니다.

  3. 주제 세부정보 페이지에서 메시지를 클릭합니다.

  4. 메시지 게시를 클릭합니다.

  5. 메시지 본문 필드에 메시지 데이터를 입력합니다.

  6. 메시지 순서 필드에서 순서 키를 입력합니다.

  7. 게시를 클릭합니다.

gcloud

순서 키가 포함된 메시지를 게시하려면 gcloud pubsub topics publish 명령어와 --ordering-key 플래그를 사용합니다.

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  --ordering-key=ORDERING_KEY

다음을 바꿉니다.

  • TOPIC_ID: 주제의 ID
  • MESSAGE_DATA: 메시지 데이터가 있는 문자열
  • ORDERING_KEY: 순서 키가 포함된 문자열

REST

순서 키가 있는 메시지를 게시하려면 다음과 같이 POST 요청을 보냅니다.

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

다음을 바꿉니다.

  • PROJECT_ID: 주제가 있는 프로젝트의 프로젝트 ID
  • TOPIC_ID: 주제의 ID

요청 본문에 다음 필드를 지정합니다.

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
      "ordering_key": "ORDERING_KEY",
    }
  ]
}

다음을 바꿉니다.

  • KEY: 메시지 속성의 키
  • VALUE: 메시지 속성의 키 값
  • MESSAGE_DATA: 메시지 데이터가 있는 base64로 인코딩된 문자열
  • ORDERING_KEY: 순서 키가 포함된 문자열

메시지는 비어 있지 않은 데이터 필드 또는 하나 이상의 속성을 포함해야 합니다.

요청이 성공하면 응답은 메시지 ID가 있는 JSON 객체로 받습니다. 다음은 메시지 ID가 포함된 응답의 예시입니다.

{
  "messageIds": [
    "19916711285",
  ]
}

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto message_id =
        publisher.Publish(pubsub::MessageBuilder{}
                              .SetData("Hello World! [" + datum.data + "]")
                              .SetOrderingKey(datum.ordering_key)
                              .Build());
    std::string ack_id = datum.ordering_key + "#" + datum.data;
    done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << ack_id << " published with id=" << *id
                << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishOrderedMessagesAsyncSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            // Sending messages to the same region ensures they are received in order even when multiple publishers are used.
            Endpoint = "us-east1-pubsub.googleapis.com:443",
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

func publishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region.
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)
	t.EnableMessageOrdering = true

	messages := []struct {
		message     string
		orderingKey string
	}{
		{
			message:     "message1",
			orderingKey: "key1",
		},
		{
			message:     "message2",
			orderingKey: "key2",
		},
		{
			message:     "message3",
			orderingKey: "key1",
		},
		{
			message:     "message4",
			orderingKey: "key2",
		},
	}

	for _, m := range messages {
		res := t.Publish(ctx, &pubsub.Message{
			Data:        []byte(m.message),
			OrderingKey: m.orderingKey,
		})

		wg.Add(1)
		go func(res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Printf("Failed to publish: %s\n", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(res)
	}

	wg.Wait()

	if totalErrors > 0 {
		fmt.Fprintf(w, "%d of 4 messages did not publish successfully", totalErrors)
		return
	}

	fmt.Fprint(w, "Published 4 messages with ordering keys successfully\n")
}

Java

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class PublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    publishWithOrderingKeysExample(projectId, topicId);
  }

  public static void publishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            // Sending messages to the same region ensures they are received in order
            // even when multiple publishers are used.
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .setEnableMessageOrdering(true)
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      // When finished with the publisher, shutdown to free up resources.
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub({
  // Sending messages to the same region ensures they are received in order
  // even when multiple publishers are used.
  apiEndpoint: 'us-east1-pubsub.googleapis.com:443',
});

async function publishOrderedMessage(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Be sure to set an ordering key that matches other messages
  // you want to receive in order, relative to each other.
  const message = {
    data: dataBuffer,
    orderingKey: orderingKey,
  };

  // Cache topic objects (publishers) and reuse them.
  //
  // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
  // key are in the same region. For list of locational endpoints for Pub/Sub, see:
  // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
  const publishOptions = {
    messageOrdering: true,
  };
  const topic = pubSubClient.topic(topicNameOrId, publishOptions);

  // Publishes the message
  const messageId = topic.publishMessage(message);

  console.log(`Message ${messageId} published.`);

  return messageId;
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    print(future.result())

print(f"Published messages with ordering keys to {topic_path}.")

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new endpoint: "us-east1-pubsub.googleapis.com:443"

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
  topic.publish_async "This is message ##{i}.",
                      ordering_key: "ordering-key"
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
puts "Messages published with ordering key."

게시자 모니터링

Cloud Monitoring은 주제를 모니터링하기 위한 여러 측정항목을 제공합니다.

주제를 모니터링하고 정상 게시자를 유지하려면 정상 게시자 유지를 참조하세요.

다음 단계