클라이언트 라이브러리를 사용하여 Pub/Sub에서 메시지 게시 및 수신

이 페이지에서는 다음을 수행하는 방법을 보여줍니다.

  • Google Cloud CLI를 사용하여 프로젝트를 만들고, 결제를 사용 설정하고, Pub/Sub Lite API를 사용 설정합니다.

  • Google Cloud CLI를 사용하여 라이트 예약, 라이트 주제, 라이트 구독을 만듭니다.

  • Go, 자바, Python용 Pub/Sub Lite 클라이언트 라이브러리를 사용하여 메시지를 주고받습니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud CLI를 설치합니다.
  3. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  4. Google Cloud 프로젝트를 만들거나 선택합니다.

    • Google Cloud 프로젝트를 만듭니다.

      gcloud projects create PROJECT_ID

      PROJECT_ID를 만들려는 Google Cloud 프로젝트의 이름으로 바꿉니다.

    • 만든 Google Cloud 프로젝트를 선택합니다.

      gcloud config set project PROJECT_ID

      PROJECT_ID를 Google Cloud 프로젝트 이름으로 바꿉니다.

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  6. Pub/Sub Lite API를 사용 설정합니다.

    gcloud services enable pubsublite.googleapis.com
  7. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login
  8. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.
  9. Google Cloud CLI를 설치합니다.
  10. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  11. Google Cloud 프로젝트를 만들거나 선택합니다.

    • Google Cloud 프로젝트를 만듭니다.

      gcloud projects create PROJECT_ID

      PROJECT_ID를 만들려는 Google Cloud 프로젝트의 이름으로 바꿉니다.

    • 만든 Google Cloud 프로젝트를 선택합니다.

      gcloud config set project PROJECT_ID

      PROJECT_ID를 Google Cloud 프로젝트 이름으로 바꿉니다.

  12. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  13. Pub/Sub Lite API를 사용 설정합니다.

    gcloud services enable pubsublite.googleapis.com
  14. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login
  15. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.

클라이언트 라이브러리 설치

Go

go get cloud.google.com/go/pubsublite

Java

Maven을 사용하는 경우 pom.xml 파일에 다음을 추가합니다. BOM에 대한 자세한 내용은 Google Cloud Platform 라이브러리 BOM을 참조하세요.

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsublite</artifactId>
  <version>1.13.1</version>
</dependency>
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.127.3</version>
</dependency>

Gradle을 사용하는 경우 종속 항목에 다음을 추가합니다.

implementation 'com.google.cloud:google-cloud-pubsublite:1.13.2'

SBT를 사용하는 경우 종속 항목에 다음을 추가합니다.

libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.13.2"

Visual Studio Code, IntelliJ 또는 Eclipse를 사용하는 경우 다음 IDE 플러그인을 사용하여 클라이언트 라이브러리를 프로젝트에 추가할 수 있습니다.

이 플러그인은 서비스 계정의 키 관리와 같은 추가 기능을 제공합니다. 자세한 내용은 각 플러그인의 문서를 참조하세요.

Python

pip install --upgrade google-cloud-pubsublite

라이트 예약 만들기

라이트 예약을 만들려면 gcloud pubsub lite-reservations create 명령어를 사용합니다. 다음 샘플에서는 testRes라는 예약을 만듭니다.

gcloud pubsub lite-reservations create testRes \
    --location=us-central1 \
    --throughput-capacity=1

리전별 라이트 주제 만들기

리전별 라이트 주제를 만들려면 gcloud pubsub lite-topics create 명령어를 사용합니다. 다음 샘플에서는 testTopic이라는 주제를 만듭니다.

gcloud pubsub lite-topics create testTopic \
    --location=us-central1 \
    --partitions=1 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=projects/PROJECT_ID/locations/us-central1/reservations/testRes \

다음을 바꿉니다.

PROJECT_ID: 프로젝트 ID입니다.

라이트 구독 만들기

라이트 구독을 만들려면 gcloud pubsub lite-subscriptions create 명령어를 사용합니다. 다음 샘플에서는 testSub라는 구독을 만듭니다.

gcloud pubsub lite-subscriptions create testSub \
    --location=us-central1 \
    --topic=testTopic \
    --delivery-requirement=deliver-immediately

메시지 보내기

다음 게시자 애플리케이션을 사용하여 라이트 주제로 메시지를 전송합니다.

Go

이 샘플을 사용해 보기 전에 Pub/Sub Lite 빠른 시작: 클라이언트 라이브러리 사용Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Lite Go API 참조 문서를 확인하세요.

Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
	"golang.org/x/sync/errgroup"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite topic when running this
	// sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	topicID := flag.String("topic_id", "", "Existing Pub/Sub Lite topic")
	messageCount := flag.Int("message_count", 100, "The number of messages to send")
	flag.Parse()

	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", *projectID, *zone, *topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		log.Fatalf("pscompat.NewPublisherClient error: %v", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Collect any messages that need to be republished with a new publisher
	// client.
	var toRepublish []*pubsub.Message
	var toRepublishMu sync.Mutex

	// Publish messages. Messages are automatically batched.
	g := new(errgroup.Group)
	for i := 0; i < *messageCount; i++ {
		msg := &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		}
		result := publisher.Publish(ctx, msg)

		g.Go(func() error {
			// Get blocks until the result is ready.
			id, err := result.Get(ctx)
			if err != nil {
				// NOTE: A failed PublishResult indicates that the publisher client
				// encountered a fatal error and has permanently terminated. After the
				// fatal error has been resolved, a new publisher client instance must
				// be created to republish failed messages.
				fmt.Printf("Publish error: %v\n", err)
				toRepublishMu.Lock()
				toRepublish = append(toRepublish, msg)
				toRepublishMu.Unlock()
				return err
			}

			// Metadata decoded from the id contains the partition and offset.
			metadata, err := pscompat.ParseMessageMetadata(id)
			if err != nil {
				fmt.Printf("Failed to parse message metadata %q: %v\n", id, err)
				return err
			}
			fmt.Printf("Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Printf("Publishing finished with error: %v\n", err)
	}
	fmt.Printf("Published %d messages\n", *messageCount-len(toRepublish))

	// Print the error that caused the publisher client to terminate (if any),
	// which may contain more context than PublishResults.
	if err := publisher.Error(); err != nil {
		fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
	}
}

Java

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(MessageMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

Python

이 샘플을 사용해 보기 전에 Pub/Sub Lite 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Lite Python API 참조 문서를 확인하세요.

Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
    # result() blocks. To resolve API futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published a message to {topic_path} with partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

게시자가 100개의 메시지를 라이트 주제로 전송하고 Pub/Sub Lite 서비스가 수신하는 메시지 수를 출력합니다.

메시지 수신

다음 구독자 애플리케이션을 사용하여 라이트 구독에서 메시지를 수신합니다.

Go

이 샘플을 사용해 보기 전에 Pub/Sub Lite 빠른 시작: 클라이언트 라이브러리 사용Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Lite Go API 참조 문서를 확인하세요.

Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

이 샘플을 사용해 보기 전에 Pub/Sub Lite 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Lite Java API 참조 문서를 확인하세요.

Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

이 샘플을 사용해 보기 전에 Pub/Sub Lite 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Lite Python API 참조 문서를 확인하세요.

Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

구독자가 메시지를 수신하면 구독자는 메시지 ID와 메시지 데이터를 출력합니다.

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 Google Cloud 프로젝트를 삭제하면 됩니다.

  1. 이 가이드에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 주제와 구독을 삭제합니다.
    gcloud pubsub lite-subscriptions delete testSub --location=us-central1
    gcloud pubsub lite-topics delete testTopic --location=us-central1
    
  2. 선택사항: 만든 사용자 인증 정보를 취소하고 로컬 사용자 인증 정보 파일을 삭제합니다.

    gcloud auth application-default revoke
  3. 선택사항: gcloud CLI에서 사용자 인증 정보를 취소합니다.

    gcloud auth revoke

다음 단계