라이트 주제에 메시지를 게시

이 페이지에서는 라이트 주제에 메시지를 게시하는 방법을 설명합니다. 자바용 Pub/Sub 라이트 클라이언트 라이브러리를 사용하여 메시지를 게시할 수 있습니다.

메시지를 게시하고 라이트 주제에 라이트 구독을 만든 후 라이트 구독에서 메시지를 수신할 수 있습니다.

메시지 형식

메시지는 메시지 데이터 및 메타데이터가 있는 필드로 구성됩니다. 메시지에서 다음 중 하나를 지정합니다.

클라이언트 라이브러리는 메시지를 파티션에 자동으로 할당하고 Pub/Sub 라이트 서비스는 메시지에 다음 필드를 추가합니다.

  • 파티션 내에서 고유한 메시지 ID
  • Pub/Sub 라이트 서비스가 파티션에 메시지를 저장하는 경우의 타임스탬프

메시지 게시

메시지를 게시하려면 라이트 주제에 대한 스트리밍 연결을 요청한 다음 스트리밍 연결을 통해 메시지를 전송합니다.

다음 샘플은 라이트 주제에 메시지를 게시하는 방법을 보여줍니다.

자바

이 샘플을 실행하기 전에 Pub/Sub 라이트 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(PublishMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

클라이언트 라이브러리는 메시지를 비동기적으로 전송하고 오류를 처리합니다. 오류가 발생하면 클라이언트 라이브러리가 메시지를 다시 전송합니다.

  1. Pub/Sub 라이트 서비스가 스트림을 닫습니다.
  2. 클라이언트 라이브러리는 메시지를 버퍼링하고 라이트 주제에 대한 연결을 다시 설정합니다.
  3. 클라이언트 라이브러리는 메시지를 순서대로 보냅니다.

메시지를 게시하면 Pub/Sub 라이트 서비스가 메시지를 파티션에 저장하고 게시자에게 메시지 ID를 반환합니다.

순서 키 사용

메시지의 순서 키가 동일한 경우, 클라이언트 라이브러리는 메시지를 동일한 파티션에 할당합니다. 순서 키는 최대 1,024바이트의 문자열이어야 합니다.

순서 키는 메시지의 key 필드에 있습니다. 클라이언트 라이브러리를 사용하여 순서 키를 설정할 수 있습니다.

자바

이 샘플을 실행하기 전에 Pub/Sub 라이트 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    PublishMetadata metadata = PublishMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

순서 키를 사용하여 동일한 파티션에 여러 메시지를 보낼 수 있으므로 구독자는 메시지를 순서대로 수신합니다. 클라이언트 라이브러리는 동일한 파티션에 여러 순서 키를 할당할 수 있습니다.

속성 사용

메시지 속성은 메시지에 대한 메타데이터가 있는 키-값 쌍입니다. 속성은 텍스트 또는 바이트 문자열일 수 있습니다.

속성은 메시지의 attributes 필드에 있습니다. 클라이언트 라이브러리로 속성을 설정할 수 있습니다.

자바

이 샘플을 실행하기 전에 Pub/Sub 라이트 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.ExecutionException;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  // Publish messages to a topic with custom attributes.
  public static void publishWithCustomAttributesExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-custom-attributes";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Add two sets of custom attributes to the message.
            .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    PublishMetadata metadata = PublishMetadata.decode(ackId);
    System.out.println("Published a message with custom attributes:\n" + metadata);
  }
}

속성은 메시지를 처리하는 방법을 나타낼 수 있습니다. 구독자는 메시지의 attributes 필드를 파싱하고 그 속성에 따라 메시지를 처리할 수 있습니다.

메시지 일괄 처리

클라이언트 라이브러리는 메시지를 일괄 게시합니다. 일괄 처리는 컴퓨팅 리소스를 적게 사용하지만 지연 시간은 증가합니다. 일괄 처리 설정을 사용하여 배치 크기를 변경할 수 있습니다.

다음 표는 사용자가 구성할 수 있는 일괄 처리 설정의 목록입니다.

설정 설명 기본
요청 크기 일괄 처리 최대 크기(바이트)입니다. 3.5MiB
메시지 수 일괄 처리 최대 메시지 수입니다. 메시지 1,000개
게시 지연 메시지를 배치에 추가하고 라이트 주제로 배치를 전송하는 데 걸리는 시간(밀리초)입니다. 50밀리초

클라이언트 라이브러리를 사용하여 일괄 처리 설정을 구성할 수 있습니다.

자바

이 샘플을 실행하기 전에 Pub/Sub 라이트 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publishWithBatchSettingsExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic with batch settings.
  public static void publishWithBatchSettingsExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
      long messageCountBatchSize = 100L; // default : 1000L message
      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setRequestByteThreshold(requestBytesThreshold)
              .setElementCountThreshold(messageCountBatchSize)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder()
              .setTopicPath(topicPath)
              .setBatchingSettings(batchingSettings)
              .build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      System.out.println("Published " + ackIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
      }
    }
  }
}

게시자 애플리케이션이 시작되면 클라이언트 라이브러리가 라이트 주제의 각 파티션에 대한 배치를 만듭니다. 예를 들어 라이트 주제에 파티션이 두 개 있는 경우 게시자는 두 개의 배치를 만들고 각 배치를 파티션으로 보냅니다.

메시지를 게시한 후 클라이언트 라이브러리는 배치가 최대 요청 크기, 최대 메시지 수 또는 게시 지연을 초과할 때까지 버퍼링합니다.

메시지 순서

라이트 주제는 메시지를 게시할 때 각 파티션 내 메시지의 순서를 지정합니다. 동일한 파티션에 메시지를 할당하려면 순서 키를 사용합니다.

Pub/Sub Lite는 파티션에서 메시지를 순서대로 전달하고, 구독자는 메시지를 순서대로 처리할 수 있습니다. 자세한 내용은 메시지 수신을 참조하세요.