라이트 주제에 메시지 게시

이 페이지에서는 라이트 주제에 메시지를 게시하는 방법을 설명합니다. 자바용 Pub/Sub 라이트 클라이언트 라이브러리를 사용하여 메시지를 게시할 수 있습니다.

메시지를 게시하고 라이트 주제에 라이트 구독을 만든 후 라이트 구독에서 메시지를 수신할 수 있습니다.

메시지 형식

메시지는 메시지 데이터 및 메타데이터가 있는 필드로 구성됩니다. 메시지에서 다음 중 하나를 지정합니다.

클라이언트 라이브러리는 메시지를 파티션에 자동으로 할당하고 Pub/Sub 라이트 서비스는 메시지에 다음 필드를 추가합니다.

  • 파티션 내에서 고유한 메시지 ID
  • Pub/Sub 라이트 서비스가 파티션에 메시지를 저장하는 경우의 타임스탬프

메시지 게시

메시지를 게시하려면 라이트 주제에 대한 스트리밍 연결을 요청한 다음 스트리밍 연결을 통해 메시지를 전송합니다.

다음 샘플은 라이트 주제에 메시지를 게시하는 방법을 보여줍니다.

gcloud

이 명령어를 사용하려면 Python 3.6 이상이 필요하며 grpcio Python 패키지를 설치해야 합니다. MacOS, Linux, Cloud Shell 사용자의 경우 다음을 실행합니다

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

메시지를 게시하려면 gcloud pubsub lite-topics publish 명령어를 사용합니다.

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --message=MESSAGE_DATA

다음을 바꿉니다.

  • TOPIC_ID: 라이트 주제의 ID
  • LITE_LOCATION: 라이트 주제의 위치
  • MESSAGE_DATA: 메시지 데이터가 있는 문자열

Go

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Go 설정 안내를 따르세요.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
	"golang.org/x/sync/errgroup"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite topic when running this
	// sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	topicID := flag.String("topic_id", "", "Existing Pub/Sub Lite topic")
	messageCount := flag.Int("message_count", 100, "The number of messages to send")
	flag.Parse()

	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", *projectID, *zone, *topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		log.Fatalf("pscompat.NewPublisherClient error: %v", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Collect any messages that need to be republished with a new publisher
	// client.
	var toRepublish []*pubsub.Message
	var toRepublishMu sync.Mutex

	// Publish messages. Messages are automatically batched.
	g := new(errgroup.Group)
	for i := 0; i < *messageCount; i++ {
		msg := &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		}
		result := publisher.Publish(ctx, msg)

		g.Go(func() error {
			// Get blocks until the result is ready.
			id, err := result.Get(ctx)
			if err != nil {
				// NOTE: A failed PublishResult indicates that the publisher client
				// encountered a fatal error and has permanently terminated. After the
				// fatal error has been resolved, a new publisher client instance must
				// be created to republish failed messages.
				fmt.Printf("Publish error: %v\n", err)
				toRepublishMu.Lock()
				toRepublish = append(toRepublish, msg)
				toRepublishMu.Unlock()
				return err
			}

			// Metadata decoded from the id contains the partition and offset.
			metadata, err := pscompat.ParseMessageMetadata(id)
			if err != nil {
				fmt.Printf("Failed to parse message metadata %q: %v\n", id, err)
				return err
			}
			fmt.Printf("Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Printf("Publishing finished with error: %v\n", err)
	}
	fmt.Printf("Published %d messages\n", *messageCount-len(toRepublish))

	// Print the error that caused the publisher client to terminate (if any),
	// which may contain more context than PublishResults.
	if err := publisher.Error(); err != nil {
		fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
	}
}

Java

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(MessageMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

Python

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Python 설정 안내를 따르세요.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
    # result() blocks. To resolve API futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published a message to {topic_path} with partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

클라이언트 라이브러리는 메시지를 비동기적으로 전송하고 오류를 처리합니다. 오류가 발생하면 클라이언트 라이브러리가 메시지를 다시 전송합니다.

  1. Pub/Sub 라이트 서비스가 스트림을 닫습니다.
  2. 클라이언트 라이브러리는 메시지를 버퍼링하고 라이트 주제에 대한 연결을 다시 설정합니다.
  3. 클라이언트 라이브러리는 메시지를 순서대로 보냅니다.

메시지를 게시하면 Pub/Sub 라이트 서비스가 메시지를 파티션에 저장하고 게시자에게 메시지 ID를 반환합니다.

순서 키 사용

메시지의 순서 키가 동일한 경우, 클라이언트 라이브러리는 메시지를 동일한 파티션에 할당합니다. 순서 키는 최대 1,024바이트의 문자열이어야 합니다.

순서 키는 메시지의 key 필드에 있습니다. 클라이언트 라이브러리를 사용하여 순서 키를 설정할 수 있습니다.

gcloud

이 명령어를 사용하려면 Python 3.6 이상이 필요하며 grpcio Python 패키지를 설치해야 합니다. MacOS, Linux, Cloud Shell 사용자의 경우 다음을 실행합니다

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

메시지를 게시하려면 gcloud pubsub lite-topics publish 명령어를 사용합니다.

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --ordering-key=ORDERING_KEY \
    --message=MESSAGE_DATA

다음을 바꿉니다.

  • TOPIC_ID: 라이트 주제의 ID
  • LITE_LOCATION: 라이트 주제의 위치
  • ORDERING_KEY: 파티션에 메시지 할당을 위해 사용되는 문자열
  • MESSAGE_DATA: 메시지 데이터가 있는 문자열

Go

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Go 설정 안내를 따르세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithOrderingKey(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Messages of the same ordering key will always get published to the same
	// partition. When OrderingKey is unset, messages can get published to
	// different partitions if more than one partition exists for the topic.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			OrderingKey: "test_ordering_key",
			Data:        []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}

		// Metadata decoded from the id contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(id)
		if err != nil {
			return fmt.Errorf("failed to parse message metadata %q: %w", id, err)
		}
		fmt.Fprintf(w, "Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with ordering key\n", publishedCount)
	return publisher.Error()
}

Java

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

Python

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Python 설정 안내를 따르세요.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        # Messages of the same ordering key will always get published to the same partition.
        # When ordering_key is unset, messsages can get published ot different partitions if
        # more than one partition exists for the topic.
        api_future = publisher_client.publish(
            topic_path, data.encode("utf-8"), ordering_key="testing"
        )
        # result() blocks. To resolve api futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with an ordering key to {str(topic_path)}."
)

순서 키를 사용하여 동일한 파티션에 여러 메시지를 보낼 수 있으므로 구독자는 메시지를 순서대로 수신합니다. 클라이언트 라이브러리는 동일한 파티션에 여러 순서 키를 할당할 수 있습니다.

이벤트 시간 설정

이벤트 시간을 사용하여 라이트 메시지를 게시할 수 있습니다. 이벤트 시간은 메시지에 추가할 수 있는 커스텀 속성입니다.

클라이언트 라이브러리 또는 gCloud CLI를 사용하여 이벤트 타임스탬프를 설정할 수 있습니다.

이 명령어를 사용하려면 Python 3.6 이상이 필요하며 grpcio Python 패키지를 설치해야 합니다. MacOS, Linux, Cloud Shell 사용자의 경우 다음을 실행합니다

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

메시지를 게시하려면 gcloud pubsub lite-topics publish 명령어를 사용합니다.

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --event-time=EVENT_TIME \
    --message=MESSAGE_DATA

다음을 바꿉니다.

  • TOPIC_ID: 라이트 주제의 ID

  • LITE_LOCATION: 라이트 주제의 위치

  • EVENT_TIME: 사용자 지정 이벤트 시간 시간 형식에 대한 자세한 내용은 gcloud topic datetimes를 실행하세요.

  • MESSAGE_DATA: 메시지 데이터가 있는 문자열

속성 사용

메시지 속성은 메시지에 대한 메타데이터가 있는 키-값 쌍입니다. 속성은 텍스트 또는 바이트 문자열일 수 있습니다.

속성은 메시지의 attributes 필드에 있습니다. 클라이언트 라이브러리로 속성을 설정할 수 있습니다.

gcloud

이 명령어를 사용하려면 Python 3.6 이상이 필요하며 grpcio Python 패키지를 설치해야 합니다. MacOS, Linux, Cloud Shell 사용자의 경우 다음을 실행합니다

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

메시지를 게시하려면 gcloud pubsub lite-topics publish 명령어를 사용합니다.

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --message=MESSAGE_DATA \
    --attribute=KEY=VALUE,...

다음을 바꿉니다.

  • TOPIC_ID: 라이트 주제의 ID
  • LITE_LOCATION: 라이트 주제의 위치
  • MESSAGE_DATA: 메시지 데이터가 있는 문자열
  • KEY: 메시지 속성의 키
  • VALUE: 메시지 속성의 키 값

Go

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Go 설정 안내를 따르세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithCustomAttributes(w io.Writer, projectID, zone, topicID string) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Publish a message with custom attributes.
	result := publisher.Publish(ctx, &pubsub.Message{
		Data: []byte("message-with-custom-attributes"),
		Attributes: map[string]string{
			"year":   "2020",
			"author": "unknown",
		},
	})

	// Get blocks until the result is ready.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("publish error: %w", err)
	}

	fmt.Fprintf(w, "Published a message with custom attributes: %v\n", id)
	return publisher.Error()
}

Java

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import java.time.Instant;
import java.util.concurrent.ExecutionException;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish messages to a topic with custom attributes.
  public static void publishWithCustomAttributesExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    // Prepare the message data as a byte string.
    String messageData = "message-with-custom-attributes";
    ByteString data = ByteString.copyFromUtf8(messageData);

    // Prepare a protobuf-encoded event timestamp for the message.
    Instant now = Instant.now();
    String eventTime =
        MessageTransforms.encodeAttributeEventTime(Timestamps.fromMillis(now.toEpochMilli()));

    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Add two sets of custom attributes to the message.
            .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
            // Add an event timestamp as an attribute.
            .putAttributes(MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO, eventTime)
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with custom attributes:\n" + metadata);
  }
}

Python

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Python 설정 안내를 따르세요.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(
        topic_path,
        data.encode("utf-8"),
        year="2020",
        author="unknown",
    )
    # result() blocks. To resolve api futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")

속성은 메시지를 처리하는 방법을 나타낼 수 있습니다. 구독자는 메시지의 attributes 필드를 파싱하고 그 속성에 따라 메시지를 처리할 수 있습니다.

메시지 일괄 처리

클라이언트 라이브러리는 메시지를 일괄 게시합니다. 일괄 처리는 컴퓨팅 리소스를 적게 사용하지만 지연 시간은 증가합니다. 일괄 처리 설정을 사용하여 배치 크기를 변경할 수 있습니다.

다음 표는 사용자가 구성할 수 있는 일괄 처리 설정의 목록입니다.

설정 설명 기본
요청 크기 일괄 처리 최대 크기(바이트)입니다. 3.5MiB
메시지 수 일괄 처리 최대 메시지 수입니다. 메시지 1,000개
게시 지연 메시지를 배치에 추가하고 라이트 주제로 배치를 전송하는 데 걸리는 시간(밀리초)입니다. 50밀리초

클라이언트 라이브러리를 사용하여 일괄 처리 설정을 구성할 수 있습니다.

Go

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Go 설정 안내를 따르세요.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithBatchSettings(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Batch settings control how the publisher batches messages. These settings
	// apply per partition.
	// See https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-variables
	// for DefaultPublishSettings.
	settings := pscompat.PublishSettings{
		ByteThreshold:  5 * 1024, // 5 KiB
		CountThreshold: 1000,     // 1,000 messages
		DelayThreshold: 100 * time.Millisecond,
	}

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topicPath, settings)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClientWithSettings error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Publish requests are sent to the server based on request size, message
	// count and time since last publish, whichever condition is met first.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}
		fmt.Fprintf(w, "Published: %v\n", id)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with batch settings\n", publishedCount)
	return publisher.Error()
}

Java

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithBatchSettingsExample(
        cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic with batch settings.
  public static void publishWithBatchSettingsExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
      long messageCountBatchSize = 100L; // default : 1000L message
      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setRequestByteThreshold(requestBytesThreshold)
              .setElementCountThreshold(messageCountBatchSize)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder()
              .setTopicPath(topicPath)
              .setBatchingSettings(batchingSettings)
              .build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      System.out.println("Published " + ackIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
      }
    }
  }
}

Python

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Python 설정 안내를 따르세요.

from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)
batch_setttings = BatchSettings(
    # 2 MiB. Default to 3 MiB. Must be less than 4 MiB gRPC's per-message limit.
    max_bytes=2 * 1024 * 1024,
    # 100 ms. Default to 50 ms.
    max_latency=0.1,
    # Default to 1000.
    max_messages=100,
)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient(
    per_partition_batching_settings=batch_setttings
) as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
        # result() blocks. To resolve API futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with batch settings to {str(topic_path)}."
)

게시자 애플리케이션이 시작되면 클라이언트 라이브러리가 라이트 주제의 각 파티션에 대한 배치를 만듭니다. 예를 들어 라이트 주제에 파티션이 두 개 있는 경우 게시자는 두 개의 배치를 만들고 각 배치를 파티션으로 보냅니다.

메시지를 게시한 후 클라이언트 라이브러리는 배치가 최대 요청 크기, 최대 메시지 수 또는 게시 지연을 초과할 때까지 버퍼링합니다.

메시지 순서 지정

라이트 주제는 메시지를 게시할 때 각 파티션 내 메시지의 순서를 지정합니다. 동일한 파티션에 메시지를 할당하려면 순서 키를 사용합니다.

Pub/Sub Lite는 파티션에서 메시지를 순서대로 전달하고, 구독자는 메시지를 순서대로 처리할 수 있습니다. 자세한 내용은 메시지 수신을 참조하세요.

게시 멱등성

Pub/Sub 라이트 클라이언트 라이브러리는 다음 버전에서 게시 멱등성을 지원합니다.

네트워크 오류나 서버 오류로 인해 메시지 게시가 재시도되는 경우 정확히 한 번만 저장됩니다. 멱등성은 동일한 세션 내에서만 보장됩니다. 새 게시자 클라이언트를 사용하여 동일한 메시지를 다시 게시하는 경우 보장할 수 없습니다. 추가 서비스 비용이 발생하거나 게시 지연 시간이 늘어나지 않습니다.

게시 멱등성 사용 설정 또는 중지

게시 멱등성은 기본적으로 Pub/Sub 라이트 클라이언트 라이브러리에서 사용 설정됩니다. 각 클라이언트 라이브러리의 게시자 클라이언트 설정을 사용하여 이를 사용 중지할 수 있습니다.

게시 멱등성이 사용 설정된 경우 게시 결과에서 반환되는 오프셋은 -1일 수 있습니다. 메시지가 이미 게시된 메시지의 중복으로 식별되었지만 게시 시점에 메시지의 오프셋을 반환할 정보가 서버에 충분하지 않은 경우 이 값이 반환됩니다. 구독자가 수신한 메시지에는 항상 유효한 오프셋이 있습니다.

문제 해결

중복 수신됨

멱등성은 단일 세션으로 제한되므로 동일한 메시지를 게시하도록 게시자 클라이언트를 다시 만들면 중복이 발생할 수 있습니다.

파티션이 Pub/Sub 라이트 서비스(기본 설정)에 의해 구독자에게 자동으로 할당되는 경우 구독자 클라이언트는 동일한 메시지를 여러 번 수신할 수 있습니다. 재할당이 발생하면 메시지가 다른 구독자 클라이언트로 다시 전송될 수 있습니다.

게시자 오류

게시자 세션의 상태는 7일 동안 활동이 없으면 서버에서 가비지로 수집됩니다. 이 기간이 지난 후 세션이 재개되면 게시자 클라이언트가 '실패한 전제조건: 게시 메시지의 번호 지정이 필요합니다'와 같은 오류 메시지와 함께 종료되며 새 메시지를 허용하지 않습니다. 이 오류를 해결하려면 게시자 클라이언트를 다시 만드세요.