Cloud Storage サブスクリプションを作成する

このドキュメントでは、Cloud Storage サブスクリプションの作成方法について説明します。Cloud Storage サブスクリプションを作成するには、Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用できます。

準備

このドキュメントを読む前に、次の内容をよく理解しておいてください。

必要なロールと権限

ロールと権限に関するガイドラインは次の一覧に示すとおりです。

  • サブスクリプションを作成するには、プロジェクト レベルでアクセス制御を構成する必要があります。

  • このセクションで後述するように、サブスクリプションとトピックが異なるプロジェクトにある場合は、リソースレベルの権限も必要です。

  • Cloud Storage サブスクリプションを作成するには、Pub/Sub サービス アカウントに、特定の Cloud Storage バケットへの書き込みとバケットのメタデータの読み取りを行う権限が付与されている必要があります。これらの権限を付与する方法について詳しくは、このドキュメントの次のセクションをご覧ください。

Cloud Storage サブスクリプションの作成に必要な権限を取得するには、管理者にプロジェクトに対する Pub/Sub 編集者roles/pubsub.editor)IAM ロールを付与するよう依頼してください。ロールの付与の詳細については、アクセス権の管理をご覧ください。

この事前定義ロールには、Cloud Storage サブスクリプションの作成に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Cloud Storage サブスクリプションを作成するには、次の権限が必要です。

  • サブスクリプションを作成する: pubsub.subscriptions.create
  • サブスクリプションをトピックに接続する: pubsub.topics.attachSubscription
  • サブスクリプションから pull する: pubsub.subscriptions.consume
  • サブスクリプションを取得する: pubsub.subscriptions.get
  • サブスクリプションを一覧表示する: pubsub.subscriptions.list
  • サブスクリプションを更新する: pubsub.subscriptions.update
  • サブスクリプションを削除する: pubsub.subscriptions.delete
  • サブスクリプションの IAM ポリシーを取得する: pubsub.subscriptions.getIamPolicy
  • サブスクリプションの IAM ポリシーを構成する: pubsub.subscriptions.setIamPolicy

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

別のプロジェクトのトピックに関連付けられているプロジェクトで Cloud Storage サブスクリプションを作成する必要がある場合は、トピック管理者にトピックに対する Pub/Sub 編集者の (roles/pubsub.editor) IAM ロールも付与するよう依頼してください。

Pub/Sub サービス アカウントに Cloud Storage のロールを割り当てる

一部の Google Cloud サービスには、Google Cloud 管理のサービス アカウントがあり、このサービスを利用してユーザーのリソースにアクセスできます。これらのサービス アカウントは、サービス エージェントとも呼ばれます。Pub/Sub では、service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com 形式でのプロジェクトごとにサービス アカウントを作成し、維持します。

Cloud Storage サブスクリプションを作成するには、Pub/Sub サービス アカウントに、特定の Cloud Storage バケットへの書き込みとバケットのメタデータの読み取りを行う権限が付与されている必要があります。次のいずれかの手順のうち 1 つを選択します。

  • バケット レベルで権限を付与します。特定の Cloud Storage バッケトでは、Storage Object Creator (roles/storage.objectCreator) のロールと Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) のロールを Pub/Sub サービス アカウントに付与します。

  • プロジェクト レベルでロールを付与する必要がある場合は、代わりに Cloud Storage バケットを含むプロジェクトでストレージ管理者(roles/storage.admin)のロールを付与できます。Pub/Sub サービス アカウントに このロールを付与します。

バケットの権限

次の手順を行い、バケットレベルで Storage Object Creator(roles/storage.objectCreator)のロールと Storage Legacy Bucket Reader(roles/storage.legacyBucketReader)のロールを付与します。

  1. Google Cloud コンソールで [Cloud Storage] ページに移動します。

    [Cloud Storage] に移動

  2. メッセージを書き込みたい Cloud Storage バケットをクリックします。

    [バケットの詳細] ページが開きます。

  3. [バケットの詳細] ページで、[権限] タブをクリックします。

  4. [権限] > [プリンシパルによる表示] タブで、[アクセス権を付与] をクリックします。

    [アクセス権を付与] ページが開きます。

  5. [プリンシパルを追加] セクションに、Pub/Sub サービス アカウントの名前を入力します。

    サービス アカウントの形式は service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com です。 たとえば、PROJECT_NUMBER=112233445566 のプロジェクトの場合、サービス アカウントの形式は service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com になります。

  6. [ロールを割り当て] > [ロールを選択] プルダウンで、「Creator」と入力し、[Storage オブジェクト作成者] ロールを選択します。

  7. [別のロールを追加] をクリックします。

  8. [ロールを選択] プルダウンで、Bucket Reader を入力し、[Storage Legacy Bucket Reader] のロールを選択します。

  9. [保存] をクリックします。

プロジェクトの権限

次の手順を行って、プロジェクト レベルでストレージ管理者(roles/storage.admin)ロールを付与します。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    IAM に移動

  2. [権限] > [プリンシパルによる表示] タブで、[アクセス権を付与] をクリックします。

    [アクセス権を付与] ページが開きます。

  3. [プリンシパルを追加] セクションに、Pub/Sub サービス アカウントの名前を入力します。

    サービス アカウントの形式は service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com です。 たとえば、PROJECT_NUMBER=112233445566 のプロジェクトの場合、サービス アカウントの形式は service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com になります。

  4. [ロールを割り当て] > [ロールを選択] プルダウンで、「Storage Admin」と入力し、[ストレージ管理者] ロールを選択します。

  5. [保存] をクリックします。

Cloud Storage IAM の詳細については、Cloud Storage の Identity and Access Management をご覧ください。

Cloud Storage のサブスクリプション プロパティ

Cloud Storage サブスクリプションを構成する場合は、すべてのサブスクリプション タイプに共通のプロパティと、追加の Cloud Storage サブスクリプション固有のプロパティを指定する必要があります。

共通のサブスクリプション プロパティ

すべてのサブスクリプションで設定できる共通のサブスクリプション プロパティについて学習します。

バケット名

Cloud Storage サブスクリプションを作成する前に、Cloud Storage バケットがすでに存在している必要があります。

メッセージはバッチとして送信され、Cloud Storage バケットに保存されます。 単一のバッチまたはファイルは、バケットにオブジェクトとして保存されます。

Cloud Storage バケットでは、リクエスト元による支払いを無効にする必要があります。

Cloud Storage バケットを作成するには、バケットの作成をご覧ください。

ファイル名の接頭辞、接尾辞、日時

Cloud Storage サブスクリプションによって生成された Cloud Storage 出力ファイルは、オブジェクトとして Cloud Storage バケットに保存されます。Cloud Storage バケットに格納されているオブジェクトの名前は、<file-prefix><UTC-date-time>_<uuid><file-suffix> の形式になります。

次のリストに、ファイル形式とカスタマイズ可能なフィールドの詳細を示します。

  • <file-prefix> は、カスタム ファイル名の接頭辞です。このフィールドは省略できます。

  • <UTC-date-time> は、オブジェクトの作成時間に基づいて自動生成されるカスタマイズ可能な文字列です。

  • <uuid> は、オブジェクトに対して自動生成されるランダムな文字列です。

  • <file-suffix> は、カスタム ファイル名の接尾辞です。このフィールドは省略できます。ファイル名の接尾辞を「/」で終わることはできません。

  • ファイル名の接頭辞と接尾辞を変更できます。

    • たとえば、ファイル名接頭辞の値が prod_ で、ファイル名接尾辞の値が _archive の場合、サンプル オブジェクト名は prod_2023-09-25T04:10:00+00:00_uN1QuE_archive です。

    • ファイル名の接頭辞と接尾辞を指定しない場合、Cloud Storage バケットに保存されるオブジェクト名は <UTC-date-time>_<uuid> の形式になります。

    • Cloud Storage オブジェクトの命名要件は、ファイル名の接頭辞と接尾辞にも適用されます。詳細については、Cloud Storage オブジェクトについてをご覧ください。

  • ファイル名での日時の表示方法を変更できます。

    • 1 回だけ使用できる必須の日時マッチャー: 年(YYYY または YY)、月(MM)、日(DD)、時(hh)、分(mm)、秒(ss)。たとえば、YY-YYYYMMM は無効です。

    • 1 回だけ使用できる任意のマッチャー: 日時区切り文字(T)とタイムゾーン オフセット(Z または +00:00)。

    • 複数回使用できる省略可能な要素: ハイフン(-)、アンダースコア(_)、コロン(:)、転送スラッシュ(/)です。

    • たとえば、ファイル名の日時形式の値が YYYY-MM-DD/hh_mm_ssZ の場合、サンプル オブジェクト名は prod_2023-09-25/04_10_00Z_uNiQuE_archive です。

    • ファイル名の日時形式が matcher ではない文字で終わる場合、その文字が <UTC-date-time><uuid> の間の区切り文字に置き換えられます。たとえば、ファイル名の日時形式の値が YYYY-MM-DDThh_mm_ss- の場合、サンプル オブジェクト名は prod_2023-09-25T04_10_00-uNiQuE_archive です。

ファイルのバッチ処理

Cloud Storage サブスクリプションでは、Cloud Storage バケットにオブジェクトとして保存される新しい出力ファイルを作成するタイミングを指定できます。Pub/Sub は、指定されたバッチ処理条件のいずれかに一致すると、出力ファイルを書き込みます。Cloud Storage のバッチ処理条件は次のとおりです。

  • ストレージのバッチ最大時間。これは必須の設定です。指定した最大時間の値を超えると、Cloud Storage サブスクリプションが新しい出力ファイルを書き込みます。値を指定しない場合、デフォルト値の 5 分が適用されます。 最大時間に適用可能な値は次のとおりです。

    • 最小値: 1 分
    • デフォルト値 = 5 分
    • 最大値: 10 分。
  • ストレージのバッチ最大バイト数。この設定は省略できます。指定した最大バイトの値が超過すると、Cloud Storage サブスクリプションは新しい出力ファイルを書き込みます。最大バイトに使用可能な値は次のとおりです。

    • 最小値: 1 KB
    • 最大値 = 10 GiB

たとえば、最大時間を 6 分、最大バイトを 2 GB に構成できます。4 分の時点で出力ファイルのファイルサイズが 2 GB に達すると、Pub/Sub は前のファイルをファイナライズして、新しいファイルへの書き込みを開始します。

Cloud Storage サブスクリプションでは、Cloud Storage バケット内の複数のファイルに同時に書き込む場合があります。6 分ごとに新しいファイルを作成するようにサブスクリプションを構成した場合、6 分ごとに作成される複数の Cloud Storage ファイルを目にすることがあります。

場合によっては、ファイルのバッチ処理の条件で構成されているよりも前に、Pub/Sub が新しいファイルへの書き込みを開始することがあります。 サブスクリプションが最大バイト数の値より大きいメッセージを受信した場合、ファイルが最大バイト数の値を超えることもあります。

ファイル形式

Cloud Storage サブスクリプションを作成するときに、Cloud Storage バケットに格納される出力ファイルの形式を テキストまたは Avro として指定できます。

  • テキスト: メッセージは書式なしテキストとして保存されます。改行文字は、あるメッセージをファイル内の前のメッセージから区切るものです。メッセージ ペイロードのみが保存され、属性やその他のメタデータは保存されません。

  • Avro: メッセージは Apache Avro バイナリ形式で保存されます。

    [Avro] を選択すると、[メタデータを書き込む] オプションをさらに有効にできます。このオプションを使用すると、メッセージとともにメッセージのメタデータを保存できます。

    subscription_namemessage_idpublish_time などのメタデータと属性のフィールドが、出力 Avro オブジェクトの最上位フィールドに書き込まれ、データ以外のすべてのメッセージプロパティ(たとえば、存在する場合 ordering_key)は、属性マップのエンティティとして追加されます。

    メタデータの書き込みが無効になっている場合、メッセージ ペイロードのみが出力 Avro オブジェクトに書き込まれます。

    メタデータの書き込みを有効にしていない出力メッセージの Avro スキーマは次のとおりです。

    {
      "type": "record",
      "namespace": "com.google.pubsub",
      "name": "PubsubMessage",
      "fields": [
        { "name": "data", "type": "bytes" }
      ]
    }
    

    [メタデータの書き込み] を有効にした出力メッセージの Avro スキーマは次のとおりです。

    {
      "type": "record",
      "namespace": "com.google.pubsub",
      "name": "PubsubMessageWithMetadata",
      "fields": [
        { "name": "subscription_name", "type": "string" },
        { "name": "message_id", "type": "string"  },
        { "name": "publish_time", "type": {
            "type": "long",
            "logicalType": "timestamp-micros"
          }
        },
        { "name": "attributes", "type": { "type": "map", "values": "string" } },
        { "name": "data", "type": "bytes" }
      ]
    }
    

Cloud Storage サブスクリプションの作成

コンソール

  1. Google Cloud コンソールで、[サブスクリプション] ページに移動します。

    サブスクリプションに移動

  2. [サブスクリプションを作成] をクリックします。

  3. [サブスクリプション ID] フィールドに名前を入力します。

    サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

  4. プルダウン メニューからトピックを選択するか、作成します。

    サブスクリプションがトピックからメッセージを受信します。

    トピックの作成方法については、トピックを作成、管理するをご覧ください。

  5. [Cloud Storage への書き込み] として [配信タイプ] を選択します。

  6. Cloud Storage バケットの場合は、[ブラウズ] をクリックします。

    • 適切なプロジェクトから既存のバケットを選択できます。

    • 作成アイコンをクリックし、画面の指示に沿って新しいバケットを作成することもできます。

      バケットを作成したら、Cloud Storage サブスクリプションのバケットを選択します。

      バケットを作成する方法の詳細については、バケットを作成するをご覧ください。

    バケットを指定すると、Pub/Sub がバケットに対して Pub/Sub サービス アカウントの適切な権限を確認します。権限の問題がある場合は、次のようなメッセージ(Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions)が表示されます。

  7. 権限の問題が発生した場合は、[権限を設定] をクリックし、画面上の指示に従います。

    または、Pub/Sub サービス アカウントに Cloud Storage のロールを割り当てるの手順に従います。

  8. [ファイル形式] の場合、[テキスト] または [Avro] を選択します。

    [Avro] を選択した場合、必要に応じて、出力にメッセージ メタデータを保存するかどうか指定することもできます。

    Avro 形式のメッセージ メタデータ オプションを含む 2 つのオプションの詳細については、ファイル形式をご覧ください。

  9. 省略可: Cloud Storage バケットに書き込まれるすべてのファイルのファイル名の接頭辞と接尾辞を指定できます。ファイルは、バケットにオブジェクトとして保存されます。

    ファイルの接頭辞と接尾辞を設定する方法については、ファイルの接頭辞と接尾辞をご覧ください。

  10. [ファイルのバッチ処理] の場合、新しいファイルを作成するまでの最大経過時間を指定します。

    必要に応じて、ファイルの最大ファイルサイズを設定することもできます。

    両方のファイルバッチ処理オプションの詳細については、ファイルのバッチ処理をご覧ください。

  11. メッセージ エラーを処理するには、デッドレターを有効にすることを強くおすすめします。

    詳細については、デッドレター トピックをご覧ください。

  12. その他の設定はデフォルトのままにして、[作成] をクリックします。

gcloud

  1. Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部で Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. Cloud Storage サブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドを実行します。
    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --cloud-storage-bucket=BUCKET_NAME \
        --cloud-storage-file-prefix=CLOUD_STORAGE_FILE_PREFIX \
        --cloud-storage-file-suffix=CLOUD_STORAGE_FILE_SUFFIX \
        --cloud-storage-max-bytes=CLOUD_STORAGE_MAX_BYTES \
        --cloud-storage-max-duration=CLOUD_STORAGE_MAX_DURATION \
        --cloud-storage-output-format=CLOUD_STORAGE_OUTPUT_FORMAT \
        --cloud-storage-write-metadata

    このコマンドで必須なのは、SUBSCRIPTION_ID--topic フラグ、--cloud-storage-bucket フラグの 3 つだけです。残りのフラグはオプションであり、省略できます。

    以下を置き換えます。

    • SUBSCRIPTION_ID: 新しい Cloud Storage サブスクリプションの名前または ID。
    • TOPIC_ID: トピックの名前または ID。
    • BUCKET_NAME: 既存のバケットの名前を指定します。例: prod_bucketバケット名にプロジェクト ID を含めてはいけません。バケットを作成するには、バケットを作成するをご覧ください。
    • CLOUD_STORAGE_FILE_PREFIX: Cloud Storage ファイル名の接頭辞を指定します。例: log_events_
    • CLOUD_STORAGE_FILE_SUFFIX: Cloud Storage ファイル名の接尾辞を指定します。例: .txt
    • CLOUD_STORAGE_MAX_BYTES: 新しいファイルが作成される前に Cloud Storage ファイルに書き込むことができる最大バイト。値は 1 KB~10 GB の範囲で指定する必要があります。例: 20MB
    • CLOUD_STORAGE_MAX_DURATION: 新しい Cloud Storage ファイルが作成されるまでの最大時間。値は 1 分~ 10 分にする必要があります。例: 5m
    • CLOUD_STORAGE_OUTPUT_FORMAT: Cloud Storage に書き込まれるデータの出力形式。値は次のとおりです。
      • text: メッセージは未加工のテキストとして書き込まれ、改行で区切られます。
      • avro: メッセージは Avro バイナリとして書き込まれます。 --cloud-storage-write-metadata は、出力形式が avro であるサブスクリプションにのみ効果があります。

C++

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& bucket) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_cloud_storage_config()->set_bucket(bucket);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

Go

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createCloudStorageSubscription creates a Pub/Sub subscription that exports messages to Cloud Storage.
func createCloudStorageSubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, bucket string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// note bucket should not have the gs:// prefix
	// bucket := "my-bucket"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic: topic,
		CloudStorageConfig: pubsub.CloudStorageConfig{
			Bucket:         bucket,
			FilenamePrefix: "log_events_",
			FilenameSuffix: ".avro",
			OutputFormat:   &pubsub.CloudStorageOutputFormatAvroConfig{WriteMetadata: true},
			MaxDuration:    1 * time.Minute,
			MaxBytes:       1e8,
		},
	})
	if err != nil {
		return fmt.Errorf("client.CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created Cloud Storage subscription: %v\n", sub)

	return nil
}

Java

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.Duration;
import com.google.pubsub.v1.CloudStorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateCloudStorageSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bucket = "your-bucket";
    String filenamePrefix = "log_events_";
    String filenameSuffix = ".text";
    Duration maxDuration = Duration.newBuilder().setSeconds(300).build();

    createCloudStorageSubscription(
        projectId, topicId, subscriptionId, bucket, filenamePrefix, filenameSuffix, maxDuration);
  }

  public static void createCloudStorageSubscription(
      String projectId,
      String topicId,
      String subscriptionId,
      String bucket,
      String filenamePrefix,
      String filenameSuffix,
      Duration maxDuration)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      CloudStorageConfig cloudStorageConfig =
          CloudStorageConfig.newBuilder()
              .setBucket(bucket)
              .setFilenamePrefix(filenamePrefix)
              .setFilenameSuffix(filenameSuffix)
              .setMaxDuration(maxDuration)
              .build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setCloudStorageConfig(cloudStorageConfig)
                  .build());

      System.out.println("Created a CloudStorage subscription: " + subscription.getAllFields());
    }
  }
}

サブスクリプションのモニタリング

Cloud Monitoring には、サブスクリプションをモニタリングするための指標が多数用意されています。

Pub/Sub 内からサブスクリプションをモニタリングすることもできます。

次のステップ