메시지 게시

이 문서에서는 메시지 게시에 관한 정보를 제공합니다.

게시자 애플리케이션이 메시지를 만들어 주제로 전송합니다. 구독자 개요에서 설명하는 것처럼, Pub/Sub는 기존 구독자에 최소 1회의 메시지 전송과 최선의 순서 지정을 제공합니다.

게시자 애플리케이션의 일반적인 흐름은 다음과 같습니다.

  1. 사용자 데이터를 포함하는 메시지를 생성합니다.
  2. 원하는 주제에 메시지를 게시하려면 Pub/Sub 서버에 요청을 보냅니다.

설정

원하는 프로그래밍 언어의 환경 설정 방법은 클라이언트 라이브러리 시작 가이드를 참조하세요.

주제에 메시지 게시

REST를 통해 JSON을 사용하는 경우, 메시지 데이터는 base64로 인코딩되어야 합니다. 하나 이상의 메시지를 포함하는 전체 요청은 디코딩 후 10MB보다 작아야 합니다. 메시지 페이로드는 비어 있으면 안 됩니다. 비어 있지 않은 데이터 필드나 하나 이상의 속성을 포함해야 합니다.

선택한 프로그래밍 언어에 따라 클라이언트 라이브러리는 메시지를 동기적 또는 비동기적으로 게시합니다. 비동기 게시는 일괄 처리를 지원하고 애플리케이션 처리량을 높입니다.

모든 클라이언트 라이브러리는 메시지 게시를 비동기적으로 지원합니다. 동기적 메시지 게시를 원한다면 선택한 프로그래밍 언어의 API 참조 문서를 참고해 해당 언어의 클라이언트 라이브러리가 동기적 메시지 게시를 지원하는지 확인하세요.

서버에서 생성한 ID(주제별로 하나씩만 존재)는 메시지 게시 성공 시 반환됩니다.

프로토콜

요청:

요청은 Authorization 헤더의 액세스 토큰으로 인증해야 합니다. 현재 애플리케이션 기본 사용자 인증 정보에 대한 액세스 토큰을 얻는 방법은 다음과 같습니다. gcloud auth application-default print-access-token.

POST     https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
Authorization: Bearer ACCESS_TOKEN

요청 본문에 다음 필드를 지정합니다.

{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}

응답:

200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

명령줄

gcloud pubsub topics publish my-topic --message "hello"

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId));

go

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)

	for i := 0; i < n; i++ {
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})

		wg.Add(1)
		go func(i int, res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			id, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
	}
	return nil
}

자바

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
Publisher publisher = null;

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.newBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  for (final String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Add an asynchronous callback to handle success / failure
    ApiFutures.addCallback(
        future,
        new ApiFutureCallback<String>() {

          @Override
          public void onFailure(Throwable throwable) {
            if (throwable instanceof ApiException) {
              ApiException apiException = ((ApiException) throwable);
              // details on the API exception
              System.out.println(apiException.getStatusCode().getCode());
              System.out.println(apiException.isRetryable());
            }
            System.out.println("Error publishing message : " + message);
          }

          @Override
          public void onSuccess(String messageId) {
            // Once published, returns server-assigned message ids (unique within the topic)
            System.out.println(messageId);
          }
        },
        MoreExecutors.directExecutor());
  }
} finally {
  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
  }
}

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);

php

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

futures = dict()

def get_callback(f, data):
    def callback(f):
        try:
            print(f.result())
            futures.pop(data)
        except:  # noqa
            print("Please handle {} for {}.".format(f.exception(), data))

    return callback

for i in range(10):
    data = str(i)
    futures.update({data: None})
    # When you publish a message, the client returns a future.
    future = publisher.publish(
        topic_path, data=data.encode("utf-8")  # data must be a bytestring.
    )
    futures[data] = future
    # Publish failures shall be handled in the callback function.
    future.add_done_callback(get_callback(future, data))

# Wait for all the publish futures to resolve before exiting.
while futures:
    time.sleep(5)

print("Published message with error handler.")

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

새 커스텀 속성

Pub/Sub 메시지에 커스텀 속성을 메타데이터로 삽입할 수 있습니다. 속성은 텍스트 문자열이나 바이트 문자열이 지원됩니다. 메시지 스키마는 다음과 같이 나타낼 수 있습니다.

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string
}

PubsubMessage JSON 스키마는 RESTRPC 문서의 일부로 게시됩니다.

go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Hello world!"),
		Attributes: map[string]string{
			"origin":   "golang",
			"username": "gcp",
		},
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
	return nil
}

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
  origin: 'nodejs-sample',
  username: 'gcp',
};

const messageId = await pubsub
  .topic(topicName)
  .publish(dataBuffer, customAttributes);
console.log(`Message ${messageId} published.`);

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    # Add two attributes, origin and username, to the message
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp"
    )
    print(future.result())

print("Published messages with custom attributes.")

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

지연 시간 및 처리량의 균형을 유지하기 위한 일괄 처리

Pub/Sub 클라이언트 라이브러리는 여러 메시지를 서비스에 대한 단일 호출로 일괄 처리합니다. 배치 크기가 커지면 메시지 처리량(CPU별 전송 메시지 비율)이 상승합니다. 일괄 처리의 단점은 개별 메시지의 지연 시간 증가인데, 이러한 메시지는 해당 배치가 채워져 네트워크로 전송될 준비가 될 때까지 메모리에서 큐에 추가됩니다. 지연 시간을 최소화하려면 일괄 처리를 사용 중지해야 합니다. 이는 요청-응답 시퀀스의 일부로 단일 메시지를 게시하는 애플리케이션에 특히 중요합니다. 이러한 패턴의 대표적인 예시는 Cloud Functions 또는 App Engine을 사용하는.서버리스 이벤트 기반 애플리케이션에서 볼 수 있습니다.

메시지는 요청 크기(단위: 바이트), 메시지 수, 시간을 기준으로 일괄 처리할 수 있습니다. 이 샘플에서처럼 기본 설정을 재정의할 수도 있습니다.

c#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId),
    settings: new PublisherClient.Settings
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
{
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");
}

go

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func publishWithSettings(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	t := client.Topic(topicID)
	t.PublishSettings.ByteThreshold = 5000
	t.PublishSettings.CountThreshold = 10
	t.PublishSettings.DelayThreshold = 100 * time.Millisecond

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

자바

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1 byte
long messageCountBatchSize = 10L; // default : 1 message

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings =
    BatchingSettings.newBuilder()
        .setElementCountThreshold(messageCountBatchSize)
        .setRequestByteThreshold(requestBytesThreshold)
        .setDelayThreshold(publishDelayThreshold)
        .build();

Publisher publisher =
    Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const batchPublisher = pubsub.topic(topicName, {
  batching: {
    maxMessages: maxMessages,
    maxMilliseconds: maxWaitTime,
  },
});

for (let i = 0; i < 10; i++) {
  (async () => {
    const messageId = await batchPublisher.publish(dataBuffer);
    console.log(`Message ${messageId} published.`);
  })();
}

php

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * The publisher should be used in conjunction with the `google-cloud-batch`
 * daemon, which should be running in the background.
 *
 * To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message    The message to publish.
 */
function publish_message_batch($projectId, $topicName, $message)
{
    // Check if the batch daemon is running.
    if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
        trigger_error(
            'The batch daemon is not running. Call ' .
            '`vendor/bin/google-cloud-batch daemon` from ' .
            'your project root to start the daemon.',
            E_USER_NOTICE
        );
    }

    $batchOptions = [
        'batchSize' => 100, // Max messages for each batch.
        'callPeriod' => 0.01, // Max time in seconds between each batch publish.
    ];

    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $publisher = $topic->batchPublisher([
        'batchOptions' => $batchOptions
    ]);

    for ($i = 0; $i < 10; $i++) {
        $publisher->publish(['data' => $message]);
    }

    print('Messages enqueued for publication.' . PHP_EOL);
}

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024, max_latency=1  # One kilobyte  # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    future = publisher.publish(topic_path, data=data)
    print(future.result())

print("Published messages with batch settings.")

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish do |batch|
  10.times do |i|
    batch.publish "This is message \##{i}."
  end
end

puts "Messages published in batch."
# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
10.times do |i|
  topic.publish_async "This is message \##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

재시도 요청

재시도를 보증할 수 없는 오류를 제외하고, 실패한 게시는 자동으로 다시 시도됩니다. 본 샘플 코드는 커스텀 재시도 설정을 적용한 게시자 생성 방법을 보여줍니다. 커스텀 재시도 설정을 지원하지 않는 클라이언트 라이브러리도 있습니다. 자세한 내용은 선택한 언어의 API 참조 문서에서 확인하세요.

자바

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(100); // default: 100 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
Duration maxRetryDelay = Duration.ofSeconds(60); // default : 1 minute
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
double rpcTimeoutMultiplier = 1.0; // default: 1.0
Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 10 minutes
Duration totalTimeout = Duration.ofSeconds(600); // default: 10 minutes

RetrySettings retrySettings =
    RetrySettings.newBuilder()
        .setInitialRetryDelay(retryDelay)
        .setRetryDelayMultiplier(retryDelayMultiplier)
        .setMaxRetryDelay(maxRetryDelay)
        .setInitialRpcTimeout(initialRpcTimeout)
        .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
        .setMaxRpcTimeout(maxRpcTimeout)
        .setTotalTimeout(totalTimeout)
        .build();

Publisher publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

node.js

// Imports the Google Cloud client library
const {v1} = require('@google-cloud/pubsub');

// Creates a publisher client
const client = new v1.PublisherClient({
  // optional auth parameters
});

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectId = 'my-project-id'
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

const formattedTopic = client.topicPath(projectId, topicName);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
  data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
  topic: formattedTopic,
  messages: messages,
};

// Retry settings control how the publisher handles retryable failures
// Default values are shown
const retrySettings = {
  retryCodes: [
    10, // 'ABORTED'
    1, // 'CANCELLED',
    4, // 'DEADLINE_EXCEEDED'
    13, // 'INTERNAL'
    8, // 'RESOURCE_EXHAUSTED'
    14, // 'UNAVAILABLE'
    2, // 'UNKNOWN'
  ],
  backoffSettings: {
    initialRetryDelayMillis: 100,
    retryDelayMultiplier: 1.3,
    maxRetryDelayMillis: 60000,
    initialRpcTimeoutMillis: 5000,
    rpcTimeoutMultiplier: 1.0,
    maxRpcTimeoutMillis: 600000,
    totalTimeoutMillis: 600000,
  },
};

const [response] = await client.publish(request, {retry: retrySettings});
console.log(`Message ${response.messageIds} published.`);

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the retry settings. Defaults will be overwritten.
retry_settings = {
    "interfaces": {
        "google.pubsub.v1.Publisher": {
            "retry_codes": {
                "publish": [
                    "ABORTED",
                    "CANCELLED",
                    "DEADLINE_EXCEEDED",
                    "INTERNAL",
                    "RESOURCE_EXHAUSTED",
                    "UNAVAILABLE",
                    "UNKNOWN",
                ]
            },
            "retry_params": {
                "messaging": {
                    "initial_retry_delay_millis": 100,  # default: 100
                    "retry_delay_multiplier": 1.3,  # default: 1.3
                    "max_retry_delay_millis": 60000,  # default: 60000
                    "initial_rpc_timeout_millis": 5000,  # default: 25000
                    "rpc_timeout_multiplier": 1.0,  # default: 1.0
                    "max_rpc_timeout_millis": 600000,  # default: 30000
                    "total_timeout_millis": 600000,  # default: 600000
                }
            },
            "methods": {
                "Publish": {
                    "retry_codes_name": "publish",
                    "retry_params_name": "messaging",
                }
            },
        }
    }
}

publisher = pubsub_v1.PublisherClient(client_config=retry_settings)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    future = publisher.publish(topic_path, data=data)
    print(future.result())

print("Published messages with retry settings.")

재시도 설정은 총 재시도 횟수와 지수 백오프를 제어합니다(클라이언트가 후속 재시도 사이에 대기하는 시간). 초기 PRC 시간 초과는 클라이언트가 재시도하기 전에 초기 PRC가 성공할 때까지 기다리는 시간입니다. 총 시간 초과는 클라이언트가 재시도를 중지하기 전에 기다리는 시간입니다. 게시 요청을 다시 시도하려면 초기 PRC 시간 초과가 총 시간 초과보다 짧아야 합니다.

첫 번째 RPC가 실패하거나 시간이 초과되면 지수 백오프 로직은 후속 재시도 시기를 결정합니다. 다시 시도할 때마다 RPC 시간 초과는 최대 RPC 시간 초과까지 이 배율로 증가합니다. 또한 재시도 지연 설정은 클라이언트가 오류 또는 시간 초과가 발생하여 다음 요청을 시작하는 시간을 결정합니다.

동시 실행 제어

동시 실행 지원 여부는 프로그래밍 언어에 따라 다릅니다. 자세한 내용은 API 참조 문서를 참조하세요.

다음 샘플은 게시자의 동시 실행을 제어하는 방법을 보여줍니다.

go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	t := client.Topic(topicID)
	t.PublishSettings.NumGoroutines = 1

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

자바

// create a publisher with a single threaded executor
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
Publisher publisher =
    Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    print(future.result())

print("Published messages with futures.")

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!