Pub/Sub 推送订阅的载荷解封

构建 Pub/Sub 系统时,载荷解封会有所帮助 连接到不满足特定系统要求的 标准 Pub/Sub 推送端点实现。

载荷解封装的一些可能用例如下:

  • 您不想为 HTTP 推送端点编写特定于 Pub/Sub 的消息解析代码。
  • 您更希望接收 Pub/Sub 消息元数据作为 HTTP 标头 而不是 HTTP POST 正文中的元数据。
  • 您想要发送 Pub/Sub 消息 Pub/Sub 元数据,例如在将数据发送到 第三方 API。

载荷解封的工作原理

载荷解封是一项功能,用于剥离 Pub/Sub 所有消息元数据(消息数据除外)的消息。通过发送原始消息数据,订阅者无需遵守 Pub/Sub 的任何系统要求,即可处理消息。

  • 通过载荷展开,消息数据会直接作为 HTTP 正文传送。
  • 如果不解封载荷,Pub/Sub 将提供一个 JSON 对象, 包含多个消息元数据字段和一个消息数据字段。在这种情况下,必须解析 JSON 以检索消息数据,然后进行 base64 解码。

写入元数据

启用载荷解封后,您可以使用写入元数据选项,将之前移除的消息元数据添加到请求标头中。

  • 已启用元数据写入权限。将消息元数据重新添加到请求标头中。还会传送已解码的原始消息数据。
  • 写入元数据已停用。仅传送已解码的原始消息数据。

写入元数据通过 Pub/Sub、Google Cloud CLI 参数 --push-no-wrapper-write-metadata 和 API 属性 NoWrapper 公开。默认情况下,此值为 null。

准备工作

已封装和解封装的消息示例

以下示例说明了发送封装的 HTTP 消息与未封装的 HTTP 消息之间的区别。在这些示例中,消息数据包含 字符串 {"status": "Hello there"}

在此示例中,创建订阅时将载荷解封 功能启用并向 mytopic 发布消息。它使用值为 some-key 的排序键,并且媒体类型声明为 application/json

gcloud pubsub topics publish mytopic
   --message='{"status": "Hello there"}'
   --ordering-key="some-key"
   --attribute "Content-Type=application/json"

以下部分介绍了封装消息和未封装消息之间的区别。

换行消息

以下示例展示了一条标准 Pub/Sub 封装消息。在本例中,未启用载荷解封装。

发布 推送端点接收次数
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
Content-Length: 361
Content-Type: application/json
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{
  "message": {
      "attributes": {
          "Content-Type": "application/json"
      },
      "data": "eyJzdGF0dXMiOiAiSGVsbG8gdGhlcmUifQ==", //  Base64 - {"status": "Hello there"}
      "messageId": "2070443601311540",
      "message_id": "2070443601311540",
      "publishTime": "2021-02-26T19:13:55.749Z",
      "publish_time": "2021-02-26T19:13:55.749Z"
  },
  "subscription": "projects/myproject/..."
}

已停用写入元数据的解封消息

以下示例展示了包含“写入元数据”选项的解封装消息 已停用。在本例中,不包含 x-goog-pubsub-* 标头和邮件属性

发布 推送端点接收
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
Content-Length: 25
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

启用了写入元数据的未封装消息

以下示例展示了启用了“写入元数据”选项的已取消封装消息。在本例中,包含 x-goog-pubsub-* 标头和邮件属性

发布 推送端点接收
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
x-goog-pubsub-subscription-name: "projects/myproject/..."
x-goog-pubsub-message-id: "2070443601311540"
x-goog-pubsub-publish-time: "2021-02-26T19:13:55.749Z"
x-goog-pubsub-ordering-key: "some-key"
Content-Type: application/json
Content-Length: 12
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

配置载荷解封

您可以为订阅启用载荷解封推送传送 使用 Google Cloud 控制台的订阅详情页面、Google Cloud CLI 或客户端库

控制台

  1. 在 Google Cloud 控制台中,进入订阅页面。

    打开 Pub/Sub 订阅

  2. 点击创建订阅

  3. 订阅 ID 字段中,输入名称。

    如需了解如何命名订阅,请参阅主题或订阅命名指南

  4. 从下拉菜单中选择一个主题。订阅接收消息 。

  5. 对于传送类型,选择推送

  6. 如需启用载荷解封,请选择启用载荷解封

  7. (可选)如需在请求标头中保留消息的元数据,请选择写入元数据。您必须启用此选项,才能为邮件设置 Content-Type 标头

  8. 指定端点网址。

  9. 保留所有其他默认值。

  10. 点击创建

gcloud

配置具有载荷解封功能的订阅, HTTP 标头,请运行以下 gcloud pubsub subscriptions create 命令:

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper

替换以下内容:

  • SUBSCRIPTION:拉取订阅的名称或 ID。
  • TOPIC:主题的 ID。
  • PUSH_ENDPOINT:要用作此 API 的端点的网址 订阅。例如 https://myproject.appspot.com/myhandler
  • --push-no-wrapper:直接将消息数据作为 HTTP 正文传送。

如需配置包含载荷解封装的订阅并控制 x-goog-pubsub-* 标头的使用,请运行以下命令:

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper \
  --push-no-wrapper-write-metadata
  • --push-no-wrapper-write-metadata:如果为 true,则将 Pub/Sub 消息元数据写入 HTTP 请求的 x-goog-pubsub-<KEY>:<VAL> 标头。写入 Pub/Sub 消息 属性添加到 HTTP 请求的 <KEY>:<VAL> 标头中。

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# endpoint = "https://my-test-project.appspot.com/push"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

no_wrapper = pubsub_v1.types.PushConfig.NoWrapper(write_metadata=True)
push_config = pubsub_v1.types.PushConfig(
    push_endpoint=endpoint, no_wrapper=no_wrapper
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "push_config": push_config,
        }
    )

print(f"Push no wrapper subscription created: {subscription}.")
print(f"Endpoint for subscription is: {endpoint}")
print(f"No wrapper configuration for subscription is: {no_wrapper}")

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

/*
 * Copyright 2016 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package pubsub;


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.PushConfig.NoWrapper;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateUnwrappedPushSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";
    String pushEndpoint = "https://my-test-project.appspot.com/push";

    createPushSubscriptionExample(projectId, subscriptionId, topicId, pushEndpoint);
  }

  public static void createPushSubscriptionExample(
      String projectId, String subscriptionId, String topicId, String pushEndpoint)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);
      SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
      NoWrapper noWrapper =
          NoWrapper.newBuilder()
              // Determines if message metadata is added to the HTTP headers of
              // the delivered message.
              .setWriteMetadata(true)
              .build();
      PushConfig pushConfig =
          PushConfig.newBuilder().setPushEndpoint(pushEndpoint).setNoWrapper(noWrapper).build();

      // Create a push subscription with default acknowledgement deadline of 10 seconds.
      // Messages not successfully acknowledged within 10 seconds will get resent by the server.
      Subscription subscription =
          subscriptionAdminClient.createSubscription(subscriptionName, topicName, pushConfig, 10);
      System.out.println("Created push subscription: " + subscription.getName());
    }
  }
}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& endpoint) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_push_config()->set_push_endpoint(endpoint);
  request.mutable_push_config()->mutable_no_wrapper()->set_write_metadata(
      true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createPushNoWrapperSubscription creates a push subscription where messages are delivered in the HTTP body.
func createPushNoWrapperSubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, endpoint string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// endpoint := "https://my-test-project.appspot.com/push"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:       topic,
		AckDeadline: 10 * time.Second,
		PushConfig: pubsub.PushConfig{
			Endpoint: endpoint,
			Wrapper: &pubsub.NoWrapper{
				// Determines if message metadata is added to the HTTP headers of
				// the delivered message.
				WriteMetadata: true,
			},
		},
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created push no wrapper subscription: %v\n", sub)
	return nil
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint,
  topicNameOrId,
  subscriptionNameOrId
) {
  const options = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint: string,
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  const options: CreateSubscriptionOptions = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

在邮件中设置内容类型标头

启用载荷解封装后,Pub/Sub 不会在请求中自动设置媒体类型标头字段。如果您 未明确设置 Content-Type 标头字段,则 Web 服务器 在处理请求时,系统可能会将默认值 application/octet-stream 或以意想不到的方式解读请求

如果您需要 Content-Type 标头,请务必明确声明它 与发布的每条消息分别进行更新为此,您必须先启用写入元数据。启用写入元数据的结果 如提供的示例中所示。

后续步骤