Menerima pesan dari langganan Lite

Halaman ini menjelaskan cara menerima pesan dari langganan Lite. Anda dapat menerima pesan dengan library klien Pub/Sub Lite untuk Java.

Langganan Lite menghubungkan topik Lite ke aplikasi pelanggan; pelanggan akan menerima pesan dari langganan Lite. Pelanggan menerima setiap pesan yang dikirim aplikasi penayang ke topik Lite, termasuk pesan yang dikirim penerbit sebelum Anda membuat langganan Lite.

Sebelum menerima pesan dari langganan Lite, buat topik Lite, buat langganan Lite ke topik Lite, dan publikasikan pesan ke topik Lite.

Menerima pesan

Untuk menerima pesan dari langganan Lite, minta pesan dari langganan Lite. Library klien akan otomatis terhubung ke partisi dalam topik Lite yang dilampirkan ke langganan Lite. Jika lebih dari satu klien pelanggan dibuat instance-nya, pesan akan didistribusikan ke semua klien. Jumlah partisi dalam topik menentukan jumlah maksimum klien pelanggan yang dapat terhubung ke sebuah langganan secara bersamaan.

Pelanggan mungkin memerlukan waktu hingga satu menit untuk melakukan inisialisasi dan mulai menerima pesan. Setelah inisialisasi, pesan diterima dengan latensi minimal.

Contoh berikut menunjukkan cara menerima pesan dari langganan Lite:

gcloud

Perintah ini memerlukan Python 3.6 atau yang lebih baru, dan harus menginstal paket Python grpcio. Untuk pengguna MacOS, Linux, dan Cloud Shell, jalankan:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Untuk menerima pesan, gunakan perintah gcloud pubsub lite-subscriptions subscribe:

gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
    --location=LITE_LOCATION \
    --auto-ack

Ganti kode berikut:

  • SUBSCRIPTION_ID: ID langganan Lite
  • LITE_LOCATION: lokasi langganan Lite

Go

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Go di Library Klien Pub/Sub Lite.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Java di Library Klien Pub/Sub Lite.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Python di Library Klien Pub/Sub Lite.

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

Library klien membuat koneksi streaming dua arah ke setiap partisi dalam topik Lite.

  1. Pelanggan meminta koneksi ke partisi.

  2. Layanan Pub/Sub Lite mengirimkan pesan ke pelanggan.

Setelah memproses pesan, pelanggan harus mengonfirmasi pesan. Library klien memproses dan mengonfirmasi pesan dalam callback secara asinkron. Untuk membatasi jumlah pesan yang tidak dikonfirmasi yang dapat disimpan pelanggan di memori, konfigurasikan setelan kontrol alur.

Jika beberapa pelanggan menerima pesan dari langganan Lite yang sama, layanan Pub/Sub Lite akan menghubungkan setiap pelanggan ke proporsi partisi yang sama. Misalnya, jika dua pelanggan menggunakan langganan Lite yang sama dan langganan Lite terkait dengan topik Lite dengan dua partisi, setiap pelanggan akan menerima pesan dari salah satu partisi.

Mengonfirmasi pesan

Untuk mengonfirmasi pesan, kirim konfirmasi ke langganan Lite.

Go

Untuk mengirim konfirmasi, gunakan metode Message.Ack().

Java

Untuk mengirim konfirmasi, gunakan metode AckReplyConsumer.ack().

Python

Untuk mengirim konfirmasi, gunakan metode Message.ack().

Pelanggan harus mengonfirmasi setiap pesan. Pelanggan menerima pesan terlama yang tidak dikonfirmasi terlebih dahulu, diikuti dengan setiap pesan berikutnya. Jika pelanggan melewati satu pesan, mengonfirmasi pesan berikutnya, lalu terhubung kembali, pelanggan akan menerima pesan yang tidak terkonfirmasi dan setiap pesan berikutnya yang dikonfirmasi.

Langganan Lite tidak memiliki batas waktu konfirmasi, dan layanan Pub/Sub Lite tidak mengirim ulang pesan yang tidak dikonfirmasi melalui koneksi streaming terbuka.

Menggunakan kontrol alur

Setelah layanan Pub/Sub Lite mengirimkan pesan kepada pelanggan, pelanggan akan menyimpan pesan yang tidak dikonfirmasi di memori. Anda dapat membatasi jumlah pesan belum terselesaikan yang dapat disimpan pelanggan di memori menggunakan setelan kontrol alur. Setelan kontrol alur berlaku untuk setiap partisi tempat pelanggan menerima pesan.

Anda dapat mengonfigurasi setelan kontrol alur berikut:

  • Ukuran pesan yang melebihi batas. Ukuran maksimum pesan yang belum terselesaikan, dalam byte. Ukuran maksimum harus lebih besar dari ukuran pesan terbesar.
  • Jumlah pesan. Jumlah maksimum pesan yang belum terkirim.

Ukuran pesan ada di kolom size_bytes. Anda dapat mengonfigurasi setelan kontrol alur dengan library klien.

Go

Untuk mengonfigurasi setelan kontrol alur, teruskan ReceiveSettings saat memanggil pscompat.NewSubscriberClientWithSettings. Anda dapat menetapkan parameter berikut di ReceiveSettings:

  • MaxOutstandingMessages

  • MaxOutstandingBytes

Sebagai contoh, lihat contoh kontrol alur ini.

Java

Untuk mengonfigurasi setelan kontrol alur, gunakan metode berikut dalam class FlowControlRequest.Builder:

Python

Untuk mengonfigurasi setelan kontrol alur, tetapkan parameter berikut di class FlowControlSettings:

  • bytes_outstanding

  • messages_outstanding

Misalnya, jika jumlah maksimum pesan adalah 100 dan pelanggan terhubung ke 10 partisi, pelanggan tidak dapat menerima lebih dari 100 pesan dari 10 partisi tersebut. Jumlah total pesan yang belum terselesaikan mungkin lebih dari 100, tetapi pelanggan tidak dapat menyimpan lebih dari 100 pesan dari setiap partisi.