Pub/Sub push サブスクリプションのペイロード ラップ解除

Pub/Sub システムをビルドするときに、ペイロードのラップ解除を行うと、標準の Pub/Sub push エンドポイント実装のすべてのシステム要件に準拠していない他のシステムに接続できます。

想定されるペイロードのラップ解除のユースケースには次のようなものがあります。

  • HTTP push エンドポイントに Pub/Sub 固有のメッセージ解析コードを作成する必要はありません。
  • HTTP POST 本文のメタデータではなく、HTTP ヘッダーとして Pub/Sub メッセージのメタデータを受け取ることをおすすめします。
  • たとえば、サードパーティ API にデータを送信する場合に、Pub/Sub メッセージを送信し、Pub/Sub メタデータを除外します。

ペイロードのラップ解除の仕組み

ペイロードのラップ解除は、メッセージ データを除くすべてのメッセージ メタデータの Pub/Sub メッセージを削除する機能です。未加工のメッセージ データを送信すると、サブスクライバーは Pub/Sub のシステム要件に従うことなくメッセージを処理できます。

  • ペイロードのラップ解除では、メッセージ データは HTTP 本文として直接配信されます。
  • ペイロードのラップ解除を行わないと、Pub/Sub は、複数のメッセージ メタデータ フィールドとメッセージ データ フィールドを含む JSON オブジェクトを配信します。この場合、JSON を解析してメッセージ データを取得し、base64 でデコードする必要があります。

メタデータを書き込む

ペイロードのラップ解除を有効にすると、書き込みメタデータ オプションを使用して、以前に削除されたメッセージ メタデータをリクエスト ヘッダーに追加できます。

  • 書き込みメタデータが有効。 メッセージ ヘッダーを再度リクエスト ヘッダーに追加します。また、デコードされた未加工のメッセージ データを配信します。
  • 書き込みメタデータが無効です。 デコードされたメッセージのデータ(未加工)のみを配信します。

書き込みメタデータは、Pub/Sub、Google Cloud CLI の引数 --push-no-wrapper-write-metadata、API プロパティ NoWrapper を使用して公開されます。デフォルトでは、この値は null です。

準備

ラップおよびラップ解除されたメッセージの例

次の例は、ラップされた HTTP メッセージとラップ解除された HTTP メッセージの送信の違いを示しています。これらの例では、メッセージ データには文字列 {"status": "Hello there"} が含まれています。

この例では、ペイロードのラップ解除機能を有効にしてサブスクリプションが作成され、mytopic にメッセージが公開されます。値が some-key の順序指定キーを使用し、メディアタイプが application/json として宣言されています。

gcloud pubsub topics publish mytopic
   --message='{"status": "Hello there"}'
   --ordering-key="some-key"
   --attribute "Content-Type=application/json"

次のセクションでは、ラップされたメッセージとラップ解除されたメッセージの違いを示します。

ラップしたメッセージ

次の例は、標準の Pub/Sub でラップされたメッセージを示しています。この場合、ペイロードのラップ解除は有効になりません。

公開 push エンドポイント受信

data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }

Content-Length: 361
Content-Type: application/json
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{
  "message": {
      "attributes": {
          "Content-Type": "application/json"
      },
      "data": "eyJzdGF0dXMiOiAiSGVsbG8gdGhlcmUifQ==", //  Base64 - {"status": "Hello there"}
      "messageId": "2070443601311540",
      "message_id": "2070443601311540",
      "publishTime": "2021-02-26T19:13:55.749Z",
      "publish_time": "2021-02-26T19:13:55.749Z"
  },
  "subscription": "projects/myproject/..."
}

書き込みメタデータが無効になっているラップ解除されたメッセージ

次の例は、メタデータ書き込みオプションを無効にしてラップ解除されたメッセージを示しています。この場合、x-goog-pubsub-* ヘッダーとメッセージ属性は含まれません。

公開 push エンドポイント受信

data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }

Content-Length: 25
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

書き込みメタデータを有効にしたラップ解除されたメッセージ

次の例は、メタデータ書き込みオプションを有効にしてラップ解除されたメッセージを示しています。この場合、x-goog-pubsub-* ヘッダーとメッセージ属性が含まれます。

公開 push エンドポイント受信

data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }

x-goog-pubsub-subscription-name: "projects/myproject/..."
x-goog-pubsub-message-id: "2070443601311540"
x-goog-pubsub-publish-time: "2021-02-26T19:13:55.749Z"
x-goog-pubsub-ordering-key: "some-key"
Content-Type: application/json
Content-Length: 12
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

ペイロードのラップ解除を構成する

Google Cloud コンソールの [サブスクリプションの詳細] ページ、Google Cloud CLI、またはクライアント ライブラリを使用して、サブスクリプションのペイロード ラップ解除プッシュ配信を有効にできます。

コンソール

  1. Google Cloud コンソールで、[サブスクリプション] ページに移動します。

    Pub/Sub サブスクリプションを開く

  2. [サブスクリプションを作成] をクリックします。

  3. [サブスクリプション ID] フィールドに名前を入力します。

    サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

  4. プルダウン メニューからトピックを選択します。サブスクリプションがトピックからメッセージを受信します。

  5. [配信タイプ] で、[push] を選択します。

  6. ペイロードのラップ解除を有効にするには、[ペイロードのラップ解除を有効にする] を選択します。

  7. (省略可)リクエスト ヘッダー内のメッセージのメタデータを保持するには、[メタデータを書き込む] を選択します。メッセージに Content-Type ヘッダーを設定するには、このオプションを有効にする必要があります。

  8. エンドポイント URL を指定します。

  9. 他のすべてのデフォルト値は保持されます。

  10. [作成] をクリックします。

gcloud

標準の HTTP ヘッダーを含むペイロードのラップ解除を使用してサブスクリプションを構成するには、次の gcloud pubsub subscriptions create コマンドを実行します。

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper

以下を置き換えます。

  • SUBSCRIPTION: pull サブスクリプションの名前または ID。
  • TOPIC: トピックの ID。
  • PUSH_ENDPOINT: このサブスクリプションのエンドポイントとして使用する URL。例: https://myproject.appspot.com/myhandler
  • --push-no-wrapper: メッセージ データを HTTP 本文として直接配信します。

ペイロードのラップ解除でサブスクリプションを構成し、x-goog-pubsub-* ヘッダーの使用を制御するには、次のコマンドを実行します。

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper \
  --push-no-wrapper-write-metadata
  • --push-no-wrapper-write-metadata: true の場合、Pub/Sub メッセージのメタデータを HTTP リクエストの x-goog-pubsub-<KEY>:<VAL> ヘッダーに書き込みます。Pub/Sub メッセージ属性を HTTP リクエストの <KEY>:<VAL> ヘッダーに書き込みます。

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# endpoint = "https://my-test-project.appspot.com/push"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

no_wrapper = pubsub_v1.types.PushConfig.NoWrapper(write_metadata=True)
push_config = pubsub_v1.types.PushConfig(
    push_endpoint=endpoint, no_wrapper=no_wrapper
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "push_config": push_config,
        }
    )

print(f"Push no wrapper subscription created: {subscription}.")
print(f"Endpoint for subscription is: {endpoint}")
print(f"No wrapper configuration for subscription is: {no_wrapper}")

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

/*
 * Copyright 2016 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package pubsub;

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.PushConfig.NoWrapper;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateUnwrappedPushSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";
    String pushEndpoint = "https://my-test-project.appspot.com/push";

    createPushSubscriptionExample(projectId, subscriptionId, topicId, pushEndpoint);
  }

  public static void createPushSubscriptionExample(
      String projectId, String subscriptionId, String topicId, String pushEndpoint)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);
      SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
      NoWrapper noWrapper =
          NoWrapper.newBuilder()
              // Determines if message metadata is added to the HTTP headers of
              // the delivered message.
              .setWriteMetadata(true)
              .build();
      PushConfig pushConfig =
          PushConfig.newBuilder().setPushEndpoint(pushEndpoint).setNoWrapper(noWrapper).build();

      // Create a push subscription with default acknowledgement deadline of 10 seconds.
      // Messages not successfully acknowledged within 10 seconds will get resent by the server.
      Subscription subscription =
          subscriptionAdminClient.createSubscription(subscriptionName, topicName, pushConfig, 10);
      System.out.println("Created push subscription: " + subscription.getName());
    }
  }
}

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& endpoint) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_push_config()->set_push_endpoint(endpoint);
  request.mutable_push_config()->mutable_no_wrapper()->set_write_metadata(
      true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createPushNoWrapperSubscription creates a push subscription where messages are delivered in the HTTP body.
func createPushNoWrapperSubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, endpoint string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// endpoint := "https://my-test-project.appspot.com/push"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:       topic,
		AckDeadline: 10 * time.Second,
		PushConfig: pubsub.PushConfig{
			Endpoint: endpoint,
			Wrapper: &pubsub.NoWrapper{
				// Determines if message metadata is added to the HTTP headers of
				// the delivered message.
				WriteMetadata: true,
			},
		},
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created push no wrapper subscription: %v\n", sub)
	return nil
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint,
  topicNameOrId,
  subscriptionNameOrId
) {
  const options = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint: string,
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  const options: CreateSubscriptionOptions = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

メッセージにコンテンツ タイプのヘッダーを設定する

ペイロードのラップ解除を有効にした後、Pub/Sub では、リクエストにメディアタイプのヘッダー フィールドが自動的に設定されることはありません。Content-Type ヘッダー フィールドを明示的に設定しない場合、リクエストを処理するウェブサーバーがデフォルト値の application/octet-stream を設定するか、予期しない方法でリクエストを解釈する場合があります。

Content-Type ヘッダーが必要な場合は、個々のパブリッシュされたメッセージへのパブリッシュ時に明示的に宣言してください。この操作を行う前に、メタデータの書き込みを有効にする必要があります。書き込みメタデータの有効化の結果は、提供されている例に示されています。

次のステップ