接收精简版订阅的消息

本页面介绍了如何接收精简版订阅的消息。您可以使用 Java 版 Pub/Sub Lite 客户端库接收消息。

精简版订阅将精简版主题连接到订阅者应用;订阅者从精简版订阅接收消息。订阅者会收到发布商应用向精简版主题发送的所有消息,包括发布者在创建精简版订阅之前发送的消息。

在接收精简版订阅的消息之前,请创建精简版主题,针对精简版主题创建精简版订阅,并发布消息

接收消息

要接收精简版订阅的消息,请向精简版订阅请求消息。客户端库会自动连接到附加到精简版订阅的精简版主题中的分区。 如果多个订阅者客户端进行实例化,则消息将在所有客户端上分发。主题中的分区数量决定了可同时连接到一个订阅的订阅者客户端数上限。

订阅者最长可能需要一分钟才能完成初始化并开始接收消息。初始化后,系统会以最短的延迟时间接收消息。

以下示例展示了如何接收精简版订阅的消息:

gcloud

此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

如需接收消息,请使用 gcloud pubsub lite-subscriptions subscribe 命令:

gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
    --location=LITE_LOCATION \
    --auto-ack

请替换以下内容:

  • SUBSCRIPTION_ID:精简版订阅的 ID
  • LITE_LOCATION:精简版订阅的位置

Go

在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Go 设置说明进行操作。


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

客户端库会建立与精简版主题中每个分区的双向流式传输连接。

  1. 订阅者请求连接到分区。

  2. Pub/Sub 精简版服务将消息传递给订阅者。

订阅者处理消息后,订阅者必须确认消息。客户端库对回调中的消息进行异步处理和确认。要限制订阅者可以存储在内存中的未确认消息数量,请配置流控制设置

如果多个订阅者从同一精简版订阅接收消息,Pub/Sub 精简版服务会将每个订阅者连接到相同比例的分区。例如,如果两个订阅者使用一个相同的精简版订阅,并且该精简版订阅附加到具有两个分区的精简版主题,则每个订阅者会收到来自其中一个分区的消息。

确认消息

要确认消息,请向精简版订阅发送确认消息。

Go

如需发送确认,请使用 Message.Ack() 方法。

Java

如需发送确认,请使用 AckReplyConsumer.ack() 方法。

Python

如需发送确认,请使用 Message.ack() 方法。

订阅者必须确认每条消息。订阅者会先收到最早的未确认消息,然后接收每个后续消息。如果订阅者跳过一条消息,确认后续消息,然后重新连接,则订阅者会收到未确认的消息以及每条已确认的后续消息。

精简版订阅没有确认时限,并且 Pub/Sub Lite 服务不会通过开放的串流连接重新确认未确认的消息。

使用流控制

Pub/Sub Lite 服务向订阅者传送消息后,订阅者会将未确认的消息存储在内存中。您可以使用流控制设置限制订阅者可在内存中存储的未完成消息数量。流控制设置适用于订阅者接收消息的每个分区。

您可以配置以下流控制设置:

  • 待处理的消息大小。待处理的消息的大小上限(以字节为单位)。最大大小必须大于最大消息的大小。
  • 消息数量待处理消息的最大数量。

消息的大小位于 size_bytes 字段中。您可以使用客户端库配置流控制设置。

Go

如需配置流控制设置,请在调用 pscompat.NewSubscriberClientWithSettings 时传入 ReceiveSettings。您可以在 ReceiveSettings 中设置以下参数:

  • MaxOutstandingMessages

  • MaxOutstandingBytes

如需查看示例,请参阅此流控制示例

Java

如需配置流控制设置,请在 FlowControlRequest.Builder 类中使用以下方法:

Python

如需配置流控制设置,请在 FlowControlSettings 类中设置以下参数:

  • bytes_outstanding

  • messages_outstanding

例如,如果消息数量上限为 100,并且订阅者连接到 10 个分区,则订阅者无法从这 10 个分区收到超过 100 条消息。未完成消息的总数可能超过 100,但订阅者无法从每个分区存储超过 100 条消息。