Pub/Sub Lite メッセージを Pub/Sub にエクスポートする

このドキュメントでは、Pub/Sub Lite メッセージを Pub/Sub に自動でエクスポートする方法について説明します。

この機能の使用が考えられるシナリオは次のとおりです。

  • Pub/Sub Lite と Pub/Sub を併用する複数のワークロードを相互運用する。
  • Pub/Sub Lite ワークロードを Pub/Sub に移行する。
  • Pub/Sub Lite ベースの既存のアプリケーションから、高度な Pub/Sub 機能(push サブスクリプションやフィルタリングなど)を使用する。
  • 複数のデータ パイプラインを整理統合する。

概要

Pub/Sub Lite から Pub/Sub へメッセージをエクスポートするには、エクスポート サブスクリプションという特別なタイプのサブスクリプションを作成します。エクスポート サブスクリプションは、Lite トピックからメッセージを受信して、Pub/Sub メッセージに変換し、その変換されたメッセージを宛先 Pub/Sub トピックに送信します。

Pub/Sub Lite メッセージのエクスポートの図

Lite トピックでは、エクスポート サブスクリプションと標準サブスクリプションの両方を組み合わせることができます。この 2 つのサブスクリプション タイプは、割り当て使用量予約スループットの点で同じです。エクスポート サブスクリプションは、Lite サブスクリプションのスループット容量を使用し、Pub/Sub のパブリッシュ スループットに対して課金されます。

1 つのエクスポート サブスクリプションでは、1 つの Lite トピックが 1 つの Pub/Sub トピックに接続されます。ただし、1 つの Lite トピックには、複数のエクスポート サブスクリプションを含めて別々の Pub/Sub トピックに接続できます(ファンアウト アーキテクチャ)。複数の Lite トピックから同じ Pub/Sub トピックにエクスポートすることもできます(ファンイン アーキテクチャ)。

Authentication

エクスポート サブスクリプションでは、Pub/Sub Lite リソースと Pub/Sub リソースの両方にアクセスします。エクスポート サブスクリプションを作成するには、次の権限が必要です。

  • pubsublite.subscriptions.create。この権限は、次の事前定義ロールに含まれます。

    • roles/pubsublite.admin
    • roles/pubsublite.editor

    Pub/Sub Lite のアクセス制御をご覧ください。

  • pubsub.topics.get。この権限は、次の事前定義ロールに含まれます。

    • roles/pubsub.admin
    • roles/pubsub.editor
    • roles/pubsub.viewer

    Pub/Sub のアクセス制御をご覧ください。

サービス エージェント

エクスポート サブスクリプションは、ユーザーに代わって Pub/Sub トピックにパブリッシュします。この処理を実行するため、Google が管理するサービス アカウントが使用されます。

プロジェクトで最初のエクスポート サブスクリプションを作成すると、Google が管理する Pub/Sub Lite サービス エージェントが自動的に作成されます。同じプロジェクトに別のエクスポート サブスクリプションを作成する場合は、同じサービス エージェントが使用されます。そのサービス エージェントの名前は、「service-<your_project_number>@gcp-sa-pubsublite.iam.gserviceaccount.com」という形です。

サービス エージェントは、エクスポート サブスクリプションと同じプロジェクト内のすべての Pub/Sub と Pub/Sub Lite トピックに公開する権限を含めて作成されます。宛先の Pub/Sub トピックがエクスポート サブスクリプションと異なるプロジェクトにある場合は、Pub/Sub パブリッシャー ロール(roles/pubsub.publisher)を追加して、サービス エージェントに追加の権限を付与する必要があります。権限は、プロジェクト全体または個々のトピックに付与できます。権限は、最小権限の原則に従って、トピックレベルで付与することをおすすめします。

詳細については、Google Cloud コンソールによるアクセス制御をご覧ください。また、gcloud projects add-iam-policy-binding コマンドを使用して IAM ロールを追加することもできます。

gcloud pubsub topics add-iam-policy-binding TOPIC_NAME \
 --member=serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsublite.iam.gserviceaccount.com
 --role=roles/pubsub.publisher

以下を置き換えます。

  • TOPIC_NAME: IAM ポリシー バインディングを追加する宛先 Pub/Sub トピックの名前。
  • PROJECT_NUMBER: Pub/Sub Lite エクスポート サブスクリプションのプロジェクトのプロジェクト番号。

エクスポート サブスクリプションを作成する

Lite エクスポート サブスクリプションは、Google Cloud コンソール、Google Cloud CLI、または Pub/Sub Lite API を使用して作成できます。

Lite エクスポート サブスクリプションは、接続先の Lite トピックと同じプロジェクトとロケーションに作成される必要があります。Lite トピックを作成するには、Lite トピックを作成して管理するをご覧ください。

Lite トピックにエクスポート サブスクリプションを接続する場合は、Lite トピックにパブリッシュされるすべてのメッセージが Pub/Sub と互換性を持つようにします。詳細については、メッセージの互換性をご覧ください。

エクスポート サブスクリプションを一旦作成すると、それを標準サブスクリプションに変更することはできません。また、その逆も同様です。

Console

  1. [Lite サブスクリプション] ページに移動します。

    [Lite サブスクリプション] に移動

  2. [Lite サブスクリプションを作成] をクリックします。

  3. [Lite サブスクリプション ID] を入力します。

  4. メッセージを受信する Lite トピックを選択します。

  5. [すぐにメッセージを配信] または [保存後にメッセージを配信] を選択します。

  6. [開始オフセット] のタイプを選択します。

  7. [Pub/Sub トピックにエクスポートします] を選択します。

  8. [宛先トピック] リストで、エクスポートされた Lite メッセージを受信する Pub/Sub トピックを選択します。

  9. 省略できます。デッドレター トピックを指定します。

    1. [デッド レタリングを有効にする] チェックボックスをオンにします。
    2. デッドレター トピックとして使用する Lite トピックを選択するか、[LITE トピックを作成] をクリックして新しいデッドレター トピックを作成します。デッドレター トピックは、エクスポート サブスクリプションと同じロケーション(ゾーンまたはリージョン)とプロジェクトに存在する必要があります。
  10. [Create(作成)] をクリックします。

gcloud

エクスポート サブスクリプションを作成するには、gcloud pubsub lite-subscriptions create コマンドを使用します。

gcloud pubsub lite-subscriptions create SUBSCRIPTION_ID \
  --location=LOCATION \
  --topic=TOPIC_ID \
  --export-pubsub-topic=PUBSUB_TOPIC_NAME \
  --export-dead-letter-topic=DEAD_LETTER_TOPIC_ID \
  --export-desired-state=DESIRED_STATE

以下を置き換えます。

  • SUBSCRIPTION_ID: 作成する Lite サブスクリプションの ID。
  • LOCATION: Lite サブスクリプションのロケーション
  • TOPIC_ID: Lite サブスクリプションに接続する Lite トピックの ID。
  • PUBSUB_TOPIC_NAME: エクスポート先の Pub/Sub トピックの名前。トピックが別のプロジェクトにある場合は、完全な名前(projects/my-project-id/topics/my-topic-id)を指定します。
  • DEAD_LETTER_TOPIC_ID: 省略可。デッドレター トピックとして使用する Lite トピックの ID。デッドレター トピックは、エクスポート サブスクリプションと同じロケーション(ゾーンまたはリージョン)とプロジェクトに存在する必要があります。
  • DESIRED_STATE: 省略可。サブスクリプションの開始状態。次の値を使用できます。
    • active: サブスクリプションによって Lite メッセージが Pub/Sub にエクスポートされます。(デフォルト)
    • paused: Lite メッセージのエクスポートが一時停止されます。

リクエストが成功すると、コマンドラインに確認メッセージが表示されます。

Created [SUBSCRIPTION_ID].

プロトコル

Lite エクスポート サブスクリプションを作成するには、次のような POST リクエストを送信します。

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LOCATION/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

  • REGION: Lite サブスクリプションを保存するリージョン。
  • PROJECT_NUMBER: Lite サブスクリプションを作成するプロジェクトのプロジェクト番号。
  • LOCATION: Pub/Sub Lite がサポートするロケーションの名前。
  • SUBSCRIPTION_ID: Lite サブスクリプションの ID。

リクエスト本文に次のフィールドを指定します。

{
  "topic": "projects/PROJECT_NUMBER/locations/LOCATION/topics/TOPIC_ID",
  "deliveryConfig": {
      "deliveryRequirement": "DELIVERY_REQUIREMENT",
  },
  "exportConfig": {
      "desiredState": "DESIRED_STATE",
      "deadLetterTopic": "projects/PROJECT_NUMBER/locations/LOCATION/topics/DEAD_LETTER_TOPIC_ID",
      "pubsubConfig": {
          "topic": "PUBSUB_TOPIC_NAME"
      }
  }
}

以下を置き換えます。

  • DELIVERY_REQUIREMENT: 配信の要件(DELIVER_AFTER_STORED または DELIVER_IMMEDIATELY)。
  • DESIRED_STATE: サブスクリプションの開始状態。次の値を使用できます。
    • ACTIVE: サブスクリプションによって Lite メッセージが Pub/Sub にエクスポートされます。
    • PAUSED: Lite メッセージのエクスポートが一時停止されます。
  • DEAD_LETTER_TOPIC_ID: デッドレター トピックとして使用する既存の Lite トピックの ID。トピックは、エクスポート サブスクリプション自体と同じロケーション(ゾーンまたはリージョン)とプロジェクトに存在する必要があります。
  • PUBSUB_TOPIC_NAME: エクスポート先の Pub/Sub トピックの名前。形式の例: projects/my-project-id/topics/my-topic-id

リクエストが成功した場合のレスポンスは、JSON 形式の Lite サブスクリプションになります。

{
  "deliveryConfig": {
      "deliveryRequirement": "DELIVERY_REQUIREMENT",
  },
  "exportConfig": {
      "desiredState": "DESIRED_STATE",
      "deadLetterTopic": "projects/PROJECT_NUMBER/locations/LOCATION/topics/DEAD_LETTER_TOPIC_ID",
      "pubsubConfig": {
          "topic": "PUBSUB_TOPIC_NAME"
      },
  "name": "projects/PROJECT_NUMBER/locations/LOCATION/subscriptions/SUBSCRIPTION_ID",
  "topic": "projects/PROJECT_NUMBER/locations/LOCATION/topics/TOPIC_ID",
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

func createPubsubExportSubscription(w io.Writer, projectID, region, location, topicID, subID, pubsubTopicID string) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// NOTE: location can be either a region ("us-central1") or a zone ("us-central1-a")
	// For a list of valid locations, see https://cloud.google.com/pubsub/lite/docs/locations.
	// location := "us-central1"
	// NOTE: topic and subscription must be in the same region/zone (e.g. "us-central1-a")
	// topicID := "my-topic"
	// subID := "my-subscription"
	// pubsubTopicID := "destination-topic-id"
	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initialize the subscription to the oldest retained messages for each
	// partition.
	targetLocation := pubsublite.AtTargetLocation(pubsublite.Beginning)

	sub, err := client.CreateSubscription(ctx, pubsublite.SubscriptionConfig{
		Name:                fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, location, subID),
		Topic:               fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, location, topicID),
		DeliveryRequirement: pubsublite.DeliverImmediately, // Can also be DeliverAfterStored.
		// Configures an export subscription that writes messages to a Pub/Sub topic.
		ExportConfig: &pubsublite.ExportConfig{
			DesiredState: pubsublite.ExportActive, // Can also be ExportPaused.
			Destination: &pubsublite.PubSubDestinationConfig{
				Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, pubsubTopicID),
			},
		},
	}, targetLocation)
	if err != nil {
		return fmt.Errorf("client.CreateSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Created export subscription: %s\n", sub.Name)
	return nil
}

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.ExportConfig;
import com.google.cloud.pubsublite.proto.ExportConfig.PubSubConfig;
import com.google.cloud.pubsublite.proto.ExportConfig.State;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig;
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
import java.util.concurrent.ExecutionException;

public class CreatePubsubExportSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String pubsubTopicId = "destination-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    createPubsubExportSubscriptionExample(
        cloudRegion, zoneId, projectNumber, topicId, subscriptionId, pubsubTopicId, regional);
  }

  public static void createPubsubExportSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      String subscriptionId,
      String pubsubTopicId,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    com.google.pubsub.v1.TopicName pubsubTopicName =
        com.google.pubsub.v1.TopicName.of(String.valueOf(projectNumber), pubsubTopicId);

    Subscription subscription =
        Subscription.newBuilder()
            .setDeliveryConfig(
                // Possible values for DeliveryRequirement:
                // - `DELIVER_IMMEDIATELY`
                // - `DELIVER_AFTER_STORED`
                // You may choose whether to wait for a published message to be successfully written
                // to storage before the server delivers it to subscribers. `DELIVER_IMMEDIATELY` is
                // suitable for applications that need higher throughput.
                DeliveryConfig.newBuilder()
                    .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY))
            .setExportConfig(
                // Configures an export subscription that writes messages to a Pub/Sub topic.
                ExportConfig.newBuilder()
                    // Possible values for State:
                    // - `ACTIVE`: enable message processing.
                    // - `PAUSED`: suspend message processing.
                    .setDesiredState(State.ACTIVE)
                    .setPubsubConfig(
                        PubSubConfig.newBuilder().setTopic(pubsubTopicName.toString())))
            .setName(subscriptionPath.toString())
            .setTopic(topicPath.toString())
            .build();

    // Target location within the message backlog that the subscription should be initialized to.
    SeekTarget initialLocation = SeekTarget.of(BacklogLocation.BEGINNING);

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(location.extractRegion()).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources, or
    // use "try-with-close" statement to do this automatically.
    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      Subscription response = adminClient.createSubscription(subscription, initialLocation).get();
      System.out.println(response.getAllFields() + " created successfully.");
    } catch (ExecutionException e) {
      try {
        throw e.getCause();
      } catch (AlreadyExistsException alreadyExists) {
        System.out.println("This subscription already exists.");
      } catch (Throwable throwable) {
        throwable.printStackTrace();
      }
    }
  }
}

Python

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsublite import AdminClient, Subscription, ExportConfig
from google.cloud.pubsublite.types import (
    BacklogLocation,
    CloudRegion,
    CloudZone,
    SubscriptionPath,
    TopicPath,
)

def create_lite_pubsub_export_subscription(
    project_number,
    cloud_region="us-central1",
    zone_id="a",
    topic_id="my-topic-id",
    subscription_id="my-subscription-id",
    pubsub_topic_id="destination-topic-id",
    regional=True,
    target_location=BacklogLocation.BEGINNING,
):
    if regional:
        location = CloudRegion(cloud_region)
    else:
        location = CloudZone(CloudRegion(cloud_region), zone_id)

    topic_path = TopicPath(project_number, location, topic_id)
    subscription_path = SubscriptionPath(project_number, location, subscription_id)
    destination_topic_path = PublisherClient.topic_path(project_number, pubsub_topic_id)

    subscription = Subscription(
        name=str(subscription_path),
        topic=str(topic_path),
        delivery_config=Subscription.DeliveryConfig(
            # Possible values for delivery_requirement:
            # - `DELIVER_IMMEDIATELY`
            # - `DELIVER_AFTER_STORED`
            # You may choose whether to wait for a published message to be successfully written
            # to storage before the server delivers it to subscribers. `DELIVER_IMMEDIATELY` is
            # suitable for applications that need higher throughput.
            delivery_requirement=Subscription.DeliveryConfig.DeliveryRequirement.DELIVER_IMMEDIATELY,
        ),
        # Configures an export subscription that writes messages to a Pub/Sub topic.
        export_config=ExportConfig(
            # Possible values for desired_state:
            # - `ACTIVE`: enable message processing.
            # - `PAUSED`: suspend message processing.
            desired_state=ExportConfig.State.ACTIVE,
            pubsub_config=ExportConfig.PubSubConfig(
                topic=destination_topic_path,
            ),
        ),
    )

    # Initialize client that will be used to send requests across threads. This
    # client only needs to be created once, and can be reused for multiple requests.
    client = AdminClient(cloud_region)
    try:
        response = client.create_subscription(subscription, target_location)
        print(f"{response.name} created successfully.")
    except AlreadyExists:
        print(f"{subscription_path} already exists.")

エクスポート サブスクリプションを更新する

Lite サブスクリプションは、Google Cloud コンソール、Google Cloud CLI、または Pub/Sub Lite API を使用して更新できます。新しい設定が適用されるまで、最大で 30 秒かかることがあります。

Console

  1. [Lite サブスクリプション] ページに移動します。

    [Lite サブスクリプション] に移動

  2. [Lite サブスクリプション ID] をクリックします。

  3. [Lite サブスクリプションの詳細] ページで [編集] をクリックします。

gCloud

Lite サブスクリプションを更新するには、gcloud pubsub lite-subscriptions update コマンドを使用します。

gcloud pubsub lite-subscriptions update SUBSCRIPTION_ID \
--location=LOCATION \
--delivery-requirement=DELIVERY_REQUIREMENT \
--export-pubsub-topic=PUBSUB_TOPIC_NAME \
--export-dead-letter-topic=DEAD_LETTER_TOPIC_ID \
--export-desired-state=DESIRED_STATE

以下を置き換えます。

  • SUBSCRIPTION_ID: Lite サブスクリプションの ID。
  • LOCATION: Lite サブスクリプションのロケーション
  • DELIVERY_REQUIREMENT: 省略可。配信の要件(deliver-after-stored または deliver-immediately)。
  • PUBSUB_TOPIC_NAME: 省略可。エクスポート先の Pub/Sub トピックの名前。トピックが別のプロジェクトにある場合は、完全な名前(projects/my-project-id/topics/my-topic-id)を指定します。
  • DEAD_LETTER_TOPIC_ID: デッドレター トピックとして使用する既存の Lite トピックの ID。トピックは、エクスポート サブスクリプション自体と同じロケーション(ゾーンまたはリージョン)とプロジェクトに存在する必要があります。
  • DESIRED_STATE: 省略可。サブスクリプションの目的の状態。次の値を使用できます。
    • active: サブスクリプションによって Lite メッセージが Pub/Sub にエクスポートされます。(デフォルト)
    • paused: Lite メッセージのエクスポートが一時停止されます。

リクエストが成功すると、コマンドラインに Lite サブスクリプションが表示されます。

Updated subscription [SUBSCRIPTION_ID].
deliveryConfig:
deliveryRequirement: DELIVERY_REQUIREMENT
exportConfig:
currentState: DESIRED_STATE
deadLetterTopic: projects/PROJECT_NUMBER/locations/LOCATION/topics/DEAD_LETTER_TOPIC_ID
desiredState: DESIRED_STATE
pubsubConfig:
  topic: PUBSUB_TOPIC_NAME
name: projects/PROJECT_NUMBER/locations/LOCATION/subscriptions/SUBSCRIPTION_ID
topic: projects/PROJECT_NUMBER/locations/LOCATION/topics/TOPIC_ID

プロトコル

Lite サブスクリプションを更新するには、次のような PATCH リクエストを送信します。

PATCH https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LOCATION/subscriptions/SUBSCRIPTION_ID?updateMask=deliveryConfig.deliveryRequirement,exportConfig
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

  • REGION: Lite サブスクリプションが作成されたリージョン。
  • PROJECT_NUMBER: Lite サブスクリプションが作成されたプロジェクトのプロジェクト番号。
  • LOCATION: Lite サブスクリプションが作成されたロケーション。
  • SUBSCRIPTION_ID: Lite サブスクリプションの ID。

リクエスト本文に次のフィールドを指定します。

{
  "deliveryConfig": {
      "deliveryRequirement": "DELIVERY_REQUIREMENT",
  },
  "exportConfig": {
      "desiredState": "DESIRED_STATE",
      "deadLetterTopic": "projects/PROJECT_NUMBER/locations/LOCATION/topics/DEAD_LETTER_TOPIC_ID",
      "pubsubConfig": {
          "topic": "PUBSUB_TOPIC_NAME"
      }
  }
}

以下を置き換えます。

  • DELIVERY_REQUIREMENT: 配信の要件(DELIVER_AFTER_STORED または DELIVER_IMMEDIATELY)。
  • DESIRED_STATE: サブスクリプションの目的の状態。次の値を使用できます。
    • ACTIVE: サブスクリプションによって Lite メッセージが Pub/Sub にエクスポートされます。
    • PAUSED: Lite メッセージのエクスポートが一時停止されます。
  • DEAD_LETTER_TOPIC_ID: デッドレター トピックとして使用する既存の Lite トピックの ID。トピックは、エクスポート サブスクリプション自体と同じロケーション(ゾーンまたはリージョン)とプロジェクトに存在する必要があります。
  • PUBSUB_TOPIC_NAME: 宛先 Pub/Sub トピックの名前。形式の例: projects/my-project-id/topics/my-topic-id

リクエストが成功した場合のレスポンスは、JSON 形式の Lite サブスクリプションになります。

{
"deliveryConfig": {
  "deliveryRequirement": "DELIVERY_REQUIREMENT",
},
"exportConfig": {
  "desiredState": "DESIRED_STATE",
  "deadLetterTopic": "projects/PROJECT_NUMBER/locations/LOCATION/topics/DEAD_LETTER_TOPIC_ID",
  "pubsubConfig": { "topic": "PUBSUB_TOPIC_NAME" }
},
"name": "projects/PROJECT_NUMBER/locations/LOCATION/subscriptions/SUBSCRIPTION_ID",
"topic": "projects/PROJECT_NUMBER/locations/LOCATION/topics/TOPIC_ID",
}

エクスポート サブスクリプションを一時停止または開始する

エクスポート サブスクリプションには、「目的の状態」という設定があり、これは次の 2 つの値のいずれかです。

  • Active: サブスクリプションにより、Lite メッセージが Pub/Sub にエクスポートされます。
  • Paused: Lite メッセージのエクスポートが一時停止されます。

Google Cloud コンソールで目的の状態を変更するには:

  1. [Lite サブスクリプション] ページに移動します。

    [Lite サブスクリプション] に移動

  2. [Lite サブスクリプション ID] をクリックします。

  3. [Lite サブスクリプションの詳細] ページで [一時停止] または [開始] をクリックします。

目的の状態は、Google Cloud CLI または Pub/Sub Lite API を使用して更新することもできます。エクスポート サブスクリプションを更新するをご覧ください。

おすすめの方法

このセクションでは、エクスポート サブスクリプションを使用する場合のベスト プラクティスをいくつか説明します。

予約

サブスクリプションのスループット容量を明示的に設定するのではなく、予約でエクスポート サブスクリプションを使用することをおすすめします。

メッセージの互換性

Pub/Sub Lite メッセージに Pub/Sub との互換性がない場合、エクスポート サブスクリプションはメッセージを Pub/Sub にパブリッシュしません。代わりに、メッセージがデッドレター トピックに配置されます(デッドレター トピック割り当てられている場合)。デッドレター トピックが割り当てられなかった場合、互換性のないメッセージは破棄されます。

Lite トピックにメッセージをパブリッシュする際は、次の互換性の問題に注意してください。

  • キー。Pub/Sub Lite キーのタイプは bytes ですが、Pub/Sub の順序指定キーのタイプは string です。互換性を確保するため、Pub/Sub Lite キーは UTF-8 文字のみで構成する必要があります。

  • 属性。メッセージ属性の要件は次のとおりです。

    • 互換性を保つため、すべての Pub/Sub Lite メッセージ属性には単一の値を設定する必要があります。Pub/Sub Lite では、複数の値を持つメッセージ属性がサポートされていますが、Pub/Sub は単一値の属性のみをサポートしています。
    • メッセージ属性は、Pub/Sub メッセージの上限(メッセージあたりの属性の最大数、属性あたりのキーと値の最大サイズなど)を超えないようにする必要があります。

デッドレター トピック

互換性のないメッセージを保持して扱うには、デッドレター トピックの使用をおすすめします。デッドレター トピックは、エクスポート サブスクリプションの作成時に割り当てることができます。また、デッドレター トピックを使用するように既存のエクスポート サブスクリプションを更新することもできます。サブスクリプションが Pub/Sub と互換性のないメッセージを受信すると、そのメッセージはデッドレター トピックにパブリッシュされます。

デッドレター トピックは、標準の Pub/Sub Lite トピックです。それは、エクスポート サブスクリプションと同じロケーションとプロジェクトに存在する必要があり、ソーストピックとは異なるトピックでなければなりません。

通常、デッドレター トピックのスループット使用率は低くなります。したがって、トピックにスループットを割り当てるのではなく、デッドレター トピックに予約を割り当てることをおすすめします。

配信エラー

エクスポート サブスクリプションは、互換性のあるすべてのメッセージを宛先 Pub/Sub トピックに配信しようとします。メッセージ配信が失敗すると、エクスポート サブスクリプションは一時停止されます。エラーのカテゴリを確認するには、subscription/export_status 指標を確認します。エラーを表す値は次のとおりです。

  • PERMISSION_DENIED: メッセージをエクスポートするための十分な権限がありません。
  • NOT_FOUND: 1 つ以上のリソースが見つかりませんでした(例: 宛先トピックが存在しません)。

トラブルシューティングの詳細については、エクスポート サブスクリプションのトラブルシューティングをご覧ください。

このエラーを解決すると、エクスポート サブスクリプションは、定期的な再試行により自動的に再起動されます。

料金

エクスポート サブスクリプションが消費する Pub/Sub Lite と Pub/Sub リソースに対しては課金されます。具体的には、Pub/Sub Lite トピックに対して構成され、Pub/Sub Lite サブスクリプションで割り当てられたサブスクリプションのスループットとストレージに対して課金されます。また、宛先の Pub/Sub トピックへのパブリッシュに対しても課金されます。Pub/Sub の料金をご覧ください。

エクスポート機能の使用に追加料金はかかりません。Pub/Sub Lite エクスポート サブスクリプションと標準サブスクリプションに料金の違いはありません。

エクスポート サブスクリプションのトラブルシューティング

このセクションでは、エクスポート サブスクリプションに関するトラブルシューティングのヒントをいくつか説明します。

エクスポート サブスクリプションが一時停止されています

サブスクリプションが一時停止されていると、メッセージはエクスポートされません。

この問題を検出するには:

  • Google Cloud コンソール: サブスクリプションの詳細を表示します。サブスクリプションが一時停止されると、目的の状態現在の状態Paused になります。

  • 指標: subscription/export_status 指標は PAUSED です。

この問題を解決するには、サブスクリプションを開始します。

宛先トピックまたはデッドレター トピックが削除されました

エクスポート サブスクリプションに接続されている Pub/Sub トピックを削除するか、デッドレター トピックを削除すると、エラーが発生します。

この問題を検出するには:

  • Google Cloud コンソール: サブスクリプションの詳細を表示します。トピックが削除された場合、[現在の状態] は Not found になります。

  • 指標: subscription/export_status 指標。トピックが削除された場合、値は NOT_FOUND になります。

この問題を解決するには、宛先の Pub/Sub トピックとデッドレター トピック(構成されている場合)を確認します。

宛先の Pub/Sub が削除された場合は、同じ名前でトピックを再作成します。権限は変更されませんが、エクスポート サブスクリプションはパブリッシュを再開します。

デッドレター トピックが削除された場合は、新しいデッドレター トピックを作成し、それを参照するようエクスポート サブスクリプションを更新します。

互換性のないメッセージ

Pub/Sub との互換性がないメッセージの場合は、エクスポートされません。

この問題を検出するには:

  • 指標: subscription/unexportable_message_count 指標は、エクスポートできなかった互換性のないメッセージの数を示します。

この問題を解決するには、デッドレター トピックを使用して互換性のないメッセージを保持します。メッセージを調べて原因を特定し、必要に応じて変換して再度パブリッシュします。メッセージの互換性をご覧ください。

エクスポートが抑制されています

この問題を検出するには:

  • 指標: subscription/flow_control_status 指標は、フロー制御の理由として、未処理のバイト数またはメッセージ数がパーティションあたりの上限に達したことを表す NO_CLIENT_TOKENS を示します。この問題が解決するまで、関連するエクスポート サブスクリプションのバックログは増加します。

このエラーの根本原因は、複数のことが考えられます。想定されるほとんどの原因はバックエンドで発生しますが、次の点を確認してください。

  • 同じキーを共有する Lite メッセージを、キーごとに 1 MiB/秒未満のレートでパブリッシュしてください。エクスポート サブスクリプションは、Lite メッセージキーを Pub/Sub 順序指定キーとして書き込み、Pub/Sub には各順序指定キーに対して 1 MiB/秒の上限があります。この上限を超えると、スロットリングが発生する可能性があります。

ユーザーにこの操作を実行する権限がありません

Pub/Sub Lite サービス エージェントには、宛先の Pub/Sub トピックへのパブリッシュ権限が必要です。

この問題を検出するには:

  • Google Cloud コンソール: サブスクリプションの詳細を表示します。権限エラーがある場合、[現在の状態] は Permission denied になります。

  • 指標: subscription/export_status 指標は PERMISSON_DENIED です。

たとえば、このエラーの原因として次のような状況が考えられます。

  • Pub/Sub Lite サービス エージェントに、別のプロジェクトの宛先 Pub/Sub トピックにメッセージをパブリッシュするための適切な権限またはロールがない。
  • サービス エージェントが、エクスポート サブスクリプションの親プロジェクトの IAM ポリシーから削除された。
  • Pub/Sub Lite サービス エージェントが、まだ設定の途中にある。サービス エージェントは、プロジェクトで最初のエクスポート サブスクリプションを作成すると自動的に作成されます。この権限エラーは、10 分以内に自動的に解決されます。

この問題を解決するには、サービス エージェントに正しい権限またはロールが付与されているかどうかを確認します。サービス エージェントをご覧ください。

次のステップ

Pub/Sub または Pub/Sub Lite の選択