订购消息

消息排序是 Pub/Sub 中的一项功能,可让您接收消息 会按照 。

例如,假设某个区域中的发布者客户端发布了消息 1、2、 依次为 3 和 3。通过消息排序,订阅方客户端会收到 以相同的顺序发布消息。要按顺序投放 客户端必须在相同的 region [地区]

消息排序功能对于数据库更改等场景非常有用 捕获、用户会话跟踪和流式传输应用,其中保留了 事件的年表非常重要

本页面介绍了消息排序的概念以及如何设置 订阅者客户端按顺序接收消息。要配置您的发布商,请执行以下操作: 请参阅使用排序键发布 消息

消息排序概览

Pub/Sub 中的排序取决于以下因素:

  • 排序键:这是用于 Pub/Sub 消息元数据,表示 对消息进行排序。排序键的长度不得超过 1 KB。接收者 在某个区域中收到一组有序消息,则必须发布所有消息 并使用相同的排序键。一些排序示例 键是客户 ID 和数据库中行的主键。

    每个排序键的发布吞吐量限制为 1 MBps。通过 主题上所有排序键的吞吐量受限于配额 发布区域提供。您可以提高此限额 许多单位的 GBps。

    排序键不等同于基于分区的分区 因为排序键预计会比消息 基数大于分区数。

  • 启用消息排序:这是订阅设置。当 订阅已启用消息排序功能,订阅者客户端会收到 在 服务接收它们的顺序。您必须启用此设置

    假设您有两个订阅 A 和 B 附加到同一主题 T。订阅 A 配置了消息排序功能,并且该订阅 B 配置为未启用消息排序。在此架构中, 订阅 A 和订阅 B 从主题 T 接收同一组消息。如果 则在同一区域(订阅 A)中发布具有排序键的消息 按发布顺序接收消息然而, 订阅 B 接收的消息未按预期排序。

一般来说,如果您的解决方案要求发布商客户同时发送有序和 非排序消息,请创建单独的主题,一个用于有序消息, other 表示无序消息。

使用有序消息功能的注意事项

以下列表包含有关 Pub/Sub 中的有序消息传递:

  • 键内排序:如果发布的消息具有相同的排序键, 预计会按顺序收到假设对于排序键 A, 用于发布消息 1、2 和 3。启用排序后,预计 1 第 2 步和第 2 步之前投放的广告预计会在第 3 步之前送达。

  • 跨键排序:如果发布的消息具有不同的排序键,则 订单,而不是按顺序接收。假设您有排序键 A 和 B。 对于排序键 A,消息 1 和 2 按顺序发布。订餐专用 键 B,则按顺序发布消息 3 和消息 4。然而,消息 1 在消息 4 之前或之后到达。

  • 消息重新提交:Pub/Sub 会传送每条消息 至少一次 Pub/Sub 服务可能会重新提交消息。重新提交 消息会触发系统重新提交该键的所有后续消息,即使 是公认的假设订阅方客户端接收消息 1、2、 3 表示特定排序键。如果邮件 2 被重新提交(因为 确认截止期限已过,或者尽力确认未成功 会保留在 Pub/Sub 中),那么消息 3 也会被重新提交。如果 订阅同时启用了消息排序和死信主题,则此行为可能不正确,因为 Pub/Sub 会尽最大努力将消息转发到死信主题。

  • 确认延迟和死信主题:未确认的消息 都可能会延迟 其他排序键,尤其是在服务器重启或流量变化期间。 为了保持这些事件之间的顺序,请务必及时确认所有 消息。如果无法及时确认,请考虑使用 死信主题,以防止无限期保留消息。请注意 消息写入死信主题时,可能不会保留。

  • 消息亲和性(streamingPull 客户端):同一个键的消息 通常传送到同一个 streamPull 订阅者客户端。如果特定订阅方客户端的排序键有未处理的消息,则预计会出现亲和度。如果没有未完成的消息,则相似性 负载均衡或客户端断开连接。

    为确保顺利处理,即使可能发生相似性更改也是如此, 在设计 streamPull 应用 任何客户端中的消息。

  • 与 Dataflow 集成:不要为以下内容启用消息排序: 配置 Dataflow 时启用 Pub/Sub。Dataflow 有自己的 确保所有消息按时间顺序排列, 部分数据选取操作。这种排序方法不同于 Pub/Sub 的基于键的排序方法。使用排序键 可能会降低流水线性能。

  • 自动伸缩:Pub/Sub 的有序传送可扩容到 数十亿个订购键。排序键的数量越多 因为排序适用于所有消息 相同的排序键。

订外卖确实有一些需要权衡的因素。与无序相比 有序提交可能会略微降低发布可用性 增加端到端消息传送延迟时间。在已订购订单中 故障切换需要协调工作,以确保消息写入和读取 。

如需详细了解如何使用消息排序,请参阅 最佳实践主题:

消息排序的订阅者客户端行为

订阅方客户端会按照 特定区域Pub/Sub 支持 消息,例如连接到拉取和推送的订阅者客户端 订阅。客户端库使用 streamingPull (PHP 除外)。

如需详细了解这些订阅类型,请参阅选择订阅类型

以下部分讨论了按顺序接收消息的含义 提供两种服务

StreamingPull 订阅者客户端

将客户端库与 streamPull 结合使用时,您必须指定一个用户 每当订阅者客户端收到消息时运行的回调函数。 使用客户端库时,对于任何给定的排序键,回调都会运行 按正确的顺序对消息进行补全。如果这些消息 在该回调中确认消息之后,对消息的所有计算都会发生 。但是,如果用户回调安排了其他异步工作 订阅方客户端必须确保异步工作 都是按顺序完成的一种方法是将消息添加到本地工作队列, 将按顺序处理

拉取订阅方客户端

对于连接到拉取订阅的订阅者客户端,Pub/Sub 消息排序支持下列各项:

  • PullResponse 中排序键的所有消息 它们在列表中的顺序正确

  • 一个排序键只能有一批消息未完成 。

同一时段只能有一批邮件处于未完成状态。 因为 Pub/Sub 服务无法确保它发送的响应成功或延迟 发出 HTTP 请求

推送订阅者客户端

对推的限制比拉对的限制更加严格。 对于推送订阅,Pub/Sub 仅支持一项未完成的订阅 显示一条消息。每封邮件都会发送到 推送端点作为单独的请求因此,将请求发出 与同时投放多批次 同一排序键的消息,以同时拉取订阅者。 推送订阅可能不适用于消息 使用同一个排序键频繁发布,或 极其重要。

导出订阅者客户端

导出订阅支持有序消息。对于 BigQuery 则具有相同排序键的消息会分别写入 BigQuery 表(按顺序)。对于 Cloud Storage 订阅 排序键相同的消息可能不会全部写入同一个文件。 在同一文件中,排序键的消息按顺序显示。时间 则排序键的消息可能会出现在 文件名中的时间戳早于 包含之前邮件的文件。

启用消息排序

要按顺序接收消息,请在从中接收消息的订阅上设置消息排序属性。按顺序接收消息 延迟时间增加完成设置后,您便无法更改消息排序属性 创建订阅。

使用以下代码创建订阅时,您可以设置消息排序属性 Google Cloud 控制台、Google Cloud CLI 或 Pub/Sub API。

控制台

要使用消息排序属性创建订阅,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入订阅页面。

前往订阅页面

  1. 点击创建订阅

  2. 输入订阅 ID

  3. 选择要接收哪个主题中的消息。

  4. 消息排序部分,选择使用排序键对消息排序

  5. 点击创建

gcloud

要创建具有消息排序属性的订阅,请使用 gcloud pubsub subscriptions create 命令和 --enable-message-ordering 标志:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

SUBSCRIPTION_ID 替换为订阅的 ID。

如果请求成功,命令行会显示一条确认消息:

Created subscription [SUBSCRIPTION_ID].

REST

如需使用消息排序属性创建订阅,请发送 PUT 请求,如下所示:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

替换以下内容:

  • PROJECT_ID:包含主题的项目的 ID
  • SUBSCRIPTION_ID:订阅的 ID

在请求正文中,指定以下内容:

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

TOPIC_ID 替换为要附加到订阅的主题的 ID。

如果请求成功,则响应为 JSON 格式的订阅:

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createWithOrdering(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                 topic,
		AckDeadline:           20 * time.Second,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

topic        = pubsub.topic topic_id
subscription = topic.subscribe subscription_id,
                               message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

后续步骤