Pub/Sub push サブスクリプションのペイロードのラップ解除

Pub/Sub システムを構築するときにペイロードのアンラッピングを行うと、標準の Pub/Sub プッシュ エンドポイントの実装のすべてのシステム要件を満たしていない他のシステムに接続できます。

想定されるペイロードのラップ解除のユースケースには次のようなものがあります。

  • HTTP push エンドポイントに Pub/Sub 固有のメッセージ解析コードを作成する必要はありません。
  • HTTP POST 本文のメタデータではなく、HTTP ヘッダーとして Pub/Sub メッセージのメタデータを受信することを希望している。
  • たとえば、サードパーティ API にデータを送信する場合に、Pub/Sub メッセージを送信し、Pub/Sub メタデータを除外します。

ペイロードのラップ解除を行う方法

ペイロードのラップ解除は、Pub/Sub メッセージからメッセージ データ以外のすべてのメッセージ メタデータを削除する機能です。サブスクライバーは、未加工のメッセージ データを送信することで、Pub/Sub のシステム要件に準拠することなくメッセージを処理できます。

  • ペイロードのアンラッピングでは、メッセージ データが HTTP 本文として直接配信されます。
  • ペイロードのラップ解除を行わない場合、Pub/Sub は複数のメッセージ メタデータ フィールドとメッセージ データ フィールドを含む JSON オブジェクトを配信します。この場合、JSON を解析してメッセージ データを取得し、base64 でデコードする必要があります。

メタデータを書き込む

ペイロードのラップ解除を有効にすると、メタデータを書き込むオプションを使用できます。これにより、以前に削除したメッセージ メタデータをリクエスト ヘッダーに追加できます。

  • メタデータの書き込みが有効になっている。メッセージ メタデータをリクエスト ヘッダーに追加します。また、デコードされたメッセージのデータ(未加工)を配信します。
  • メタデータの書き込みが無効になっている。デコードされたメッセージのデータ(未加工)のみを配信します。

書き込みメタデータは、Pub/Sub、Google Cloud CLI 引数 --push-no-wrapper-write-metadata、API プロパティ NoWrapper を介して公開されます。デフォルトでは、この値は null です。

準備

ラップされたメッセージとラップされていないメッセージの例

次の例は、ラップされた HTTP メッセージとラップされていない HTTP メッセージの送信の違いを示しています。これらの例では、メッセージデータに文字列 {"status": "Hello there"} が含まれています。

この例では、ペイロードのラップ解除機能を有効にしてサブスクリプションを作成し、mytopic にメッセージをパブリッシュします。値が some-key の順序キーを使用し、メディアタイプは application/json として宣言されています。

gcloud pubsub topics publish mytopic
   --message='{"status": "Hello there"}'
   --ordering-key="some-key"
   --attribute "Content-Type=application/json"

以降のセクションでは、ラップされたメッセージとラップされていないメッセージの違いについて説明します。

折り返されたメッセージ

次の例は、標準の Pub/Sub ラップされたメッセージを示しています。この場合、ペイロードのラップ解除は有効になりません。

公開 Push エンドポイントの受信
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
Content-Length: 361
Content-Type: application/json
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{
  "message": {
      "attributes": {
          "Content-Type": "application/json"
      },
      "data": "eyJzdGF0dXMiOiAiSGVsbG8gdGhlcmUifQ==", //  Base64 - {"status": "Hello there"}
      "messageId": "2070443601311540",
      "message_id": "2070443601311540",
      "publishTime": "2021-02-26T19:13:55.749Z",
      "publish_time": "2021-02-26T19:13:55.749Z"
  },
  "subscription": "projects/myproject/..."
}

書き込みメタデータを無効にしたメッセージのラップ解除

次の例は、書き込みメタデータ オプションが無効になっているラップ解除されたメッセージを示しています。この場合、x-goog-pubsub-* ヘッダーとメッセージ属性は含まれません。

公開 Push エンドポイントの受信
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
Content-Length: 25
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

書き込みメタデータを有効にしたメッセージのラップ解除

次の例は、書き込みメタデータ オプションが有効になっているラップ解除されたメッセージを示しています。この場合、x-goog-pubsub-* ヘッダーとメッセージ属性が含まれます。

公開 Push エンドポイントの受信
data="{"status": "Hello there"}"
ordering_key="some-key"
attributes=
  {
     {"Content-Type", "application/json"}
  }
x-goog-pubsub-subscription-name: "projects/myproject/..."
x-goog-pubsub-message-id: "2070443601311540"
x-goog-pubsub-publish-time: "2021-02-26T19:13:55.749Z"
x-goog-pubsub-ordering-key: "some-key"
Content-Type: application/json
Content-Length: 12
User-Agent: CloudPubSub-Google
Host: subscription-project.uc.r.appspot.com

{"status": "Hello there"}

ペイロードのラップ解除を構成する

サブスクリプションに対してペイロードのラップ解除のプッシュ配信を有効にするには、Google Cloud コンソールの [サブスクリプションの詳細] ページ、Google Cloud CLI、またはクライアント ライブラリを使用します。

Console

  1. Google Cloud コンソールで、[サブスクリプション] ページに移動します。

    Pub/Sub サブスクリプションを開く

  2. [サブスクリプションを作成] をクリックします。

  3. [サブスクリプション ID] フィールドに名前を入力します。

    サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

  4. プルダウン メニューからトピックを選択します。サブスクリプションがトピックからメッセージを受信します。

  5. [配信タイプ] で、[push] を選択します。

  6. ペイロードのラップ解除を有効にするには、[Enable payload unwrapping] を選択します。

  7. (省略可)リクエスト ヘッダーにメッセージのメタデータを保持するには、[メタデータを書き込む] を選択します。メッセージの Content-Type ヘッダーを設定するには、このオプションを有効にする必要があります。

  8. エンドポイント URL を指定します。

  9. 他のすべてのデフォルト値は保持されます。

  10. [作成] をクリックします。

gcloud

標準の HTTP ヘッダーを含むペイロードのアンラッピングを含むサブスクリプションを構成するには、次の gcloud pubsub subscriptions create コマンドを実行します。

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper

以下を置き換えます。

  • SUBSCRIPTION: pull サブスクリプションの名前または ID。
  • TOPIC: トピックの ID。
  • PUSH_ENDPOINT: このサブスクリプションのエンドポイントとして使用する URL。例: https://myproject.appspot.com/myhandler
  • --push-no-wrapper: メッセージ データを HTTP 本文として直接配信します。

ペイロードのアンラッピングを使用してサブスクリプションを構成し、x-goog-pubsub-* ヘッダーの使用を制御するには、次のコマンドを実行します。

gcloud pubsub subscriptions create SUBSCRIPTION \
  --topic TOPIC \
  --push-endpoint=PUSH_ENDPOINT \
  --push-no-wrapper \
  --push-no-wrapper-write-metadata
  • --push-no-wrapper-write-metadata: true の場合、Pub/Sub メッセージのメタデータを HTTP リクエストの x-goog-pubsub-<KEY>:<VAL> ヘッダーに書き込みます。Pub/Sub メッセージ属性を HTTP リクエストの <KEY>:<VAL> ヘッダーに書き込みます。

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# endpoint = "https://my-test-project.appspot.com/push"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

no_wrapper = pubsub_v1.types.PushConfig.NoWrapper(write_metadata=True)
push_config = pubsub_v1.types.PushConfig(
    push_endpoint=endpoint, no_wrapper=no_wrapper
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "push_config": push_config,
        }
    )

print(f"Push no wrapper subscription created: {subscription}.")
print(f"Endpoint for subscription is: {endpoint}")
print(f"No wrapper configuration for subscription is: {no_wrapper}")

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

/*
 * Copyright 2016 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package pubsub;


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.PushConfig.NoWrapper;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateUnwrappedPushSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";
    String pushEndpoint = "https://my-test-project.appspot.com/push";

    createPushSubscriptionExample(projectId, subscriptionId, topicId, pushEndpoint);
  }

  public static void createPushSubscriptionExample(
      String projectId, String subscriptionId, String topicId, String pushEndpoint)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);
      SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
      NoWrapper noWrapper =
          NoWrapper.newBuilder()
              // Determines if message metadata is added to the HTTP headers of
              // the delivered message.
              .setWriteMetadata(true)
              .build();
      PushConfig pushConfig =
          PushConfig.newBuilder().setPushEndpoint(pushEndpoint).setNoWrapper(noWrapper).build();

      // Create a push subscription with default acknowledgement deadline of 10 seconds.
      // Messages not successfully acknowledged within 10 seconds will get resent by the server.
      Subscription subscription =
          subscriptionAdminClient.createSubscription(subscriptionName, topicName, pushConfig, 10);
      System.out.println("Created push subscription: " + subscription.getName());
    }
  }
}

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& endpoint) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_push_config()->set_push_endpoint(endpoint);
  request.mutable_push_config()->mutable_no_wrapper()->set_write_metadata(
      true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createPushNoWrapperSubscription creates a push subscription where messages are delivered in the HTTP body.
func createPushNoWrapperSubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, endpoint string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// endpoint := "https://my-test-project.appspot.com/push"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:       topic,
		AckDeadline: 10 * time.Second,
		PushConfig: pubsub.PushConfig{
			Endpoint: endpoint,
			Wrapper: &pubsub.NoWrapper{
				// Determines if message metadata is added to the HTTP headers of
				// the delivered message.
				WriteMetadata: true,
			},
		},
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created push no wrapper subscription: %v\n", sub)
	return nil
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint,
  topicNameOrId,
  subscriptionNameOrId
) {
  const options = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const pushEndpoint = 'YOUR_ENDPOINT_URL';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createPushSubscriptionNoWrapper(
  pushEndpoint: string,
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  const options: CreateSubscriptionOptions = {
    pushConfig: {
      // Set to an HTTPS endpoint of your choice. If necessary, register
      // (authorize) the domain on which the server is hosted.
      pushEndpoint,
      // When true, writes the Pub/Sub message metadata to
      // `x-goog-pubsub-<KEY>:<VAL>` headers of the HTTP request. Writes the
      // Pub/Sub message attributes to `<KEY>:<VAL>` headers of the HTTP request.
      noWrapper: {
        writeMetadata: true,
      },
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

メッセージにコンテンツ タイプのヘッダーを設定する

ペイロードのラップ解除を有効にしても、Pub/Sub はリクエストにメディアタイプ ヘッダー フィールドを自動的に設定しません。Content-Type ヘッダー フィールドを明示的に設定しない場合、リクエストを処理するウェブサーバーがデフォルト値の application/octet-stream を設定するか、予期しない方法でリクエストを解釈する場合があります。

Content-Type ヘッダーが必要な場合は、公開時に公開されるすべてのメッセージに対して明示的に宣言してください。これを行うには、まずメタデータの書き込みを有効にする必要があります。メタデータの書き込みを有効にした結果は、関連する例に示してあります。

次のステップ