Cloud Storage インポート トピックを作成する

Cloud Storage インポート トピックを使用すると、Cloud Storage から Pub/Sub にデータを継続的に取り込むことができます。その後、Pub/Sub がサポートしている任意の宛先にデータをストリーミングできます。Pub/Sub は、Cloud Storage バケットに追加された新しいオブジェクトを自動的に検出して取り込みます。

Cloud Storage は、Google Cloud でオブジェクトを保存するためのサービスです。オブジェクトとは、任意の形式のファイルで構成される不変のデータのことです。オブジェクトはバケットと呼ばれるコンテナに保存します。バケットにマネージド フォルダを含めることもできます。このフォルダを使用すると、共有の名前接頭辞を持つオブジェクトのグループにアクセスできます。

Cloud Storage の詳細については、Cloud Storage のドキュメントをご覧ください。

インポート トピックの詳細については、インポート トピックについてをご覧ください。

始める前に

Cloud Storage インポート トピックの管理に必要なロールと権限

Cloud Storage インポート トピックの作成と管理に必要な権限を取得するには、トピックまたはプロジェクトに対する Pub/Sub 編集者 roles/pubsub.editor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

この事前定義ロールには、Cloud Storage インポート トピックの作成と管理に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Cloud Storage インポート トピックの作成と管理には、次の権限が必要です。

  • インポート トピックを作成する: pubsub.topics.create
  • インポート トピックを削除する: pubsub.topics.delete
  • インポート トピックを取得する: pubsub.topics.get
  • インポート トピックを一覧表示する: pubsub.topics.list
  • インポート トピックに公開する: pubsub.topics.publish
  • インポート トピックを更新する: pubsub.topics.update
  • インポート トピックの IAM ポリシーを取得する: pubsub.topics.getIamPolicy
  • インポート トピックの IAM ポリシーを構成する: pubsub.topics.setIamPolicy

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

メッセージ ストレージ ポリシーがバケットのロケーションに準拠している

Pub/Sub トピックのメッセージ ストレージ ポリシーは、Cloud Storage バケットが配置されているリージョンと重複する必要があります。このポリシーは、Pub/Sub がメッセージ データを保存できる場所を指定します。

  • ロケーション タイプがリージョンのバケットの場合: ポリシーにその特定のリージョンを含める必要があります。たとえば、バケットが us-central1 リージョンにある場合、メッセージ ストレージ ポリシーにも us-central1 を含める必要があります。

  • ロケーション タイプがデュアルリージョンまたはマルチリージョンのバケットの場合: ポリシーには、デュアルリージョンまたはマルチリージョンのロケーション内に少なくとも 1 つのリージョンを含める必要があります。たとえば、バケットが US multi-region にある場合、メッセージ ストレージ ポリシーには us-central1us-east1、または US multi-region 内の他のリージョンを含めることができます。

    ポリシーにバケットのリージョンが含まれていない場合、トピックの作成は失敗します。たとえば、バケットが europe-west1 にあり、メッセージ ストレージ ポリシーに asia-east1 のみが含まれている場合は、エラーが発生します。

    メッセージ ストレージ ポリシーに、バケットのロケーションと重複するリージョンが 1 つしかない場合、マルチリージョン冗長性が損なわれる可能性があります。これは、その単一リージョンが使用できなくなった場合に、データにアクセスできなくなる可能性があるためです。完全な冗長性を確保するには、バケットのマルチリージョンまたはデュアルリージョンのロケーションに含まれるリージョンを少なくとも 2 つ、メッセージ ストレージ ポリシーに含めることをおすすめします。

バケットのロケーションの詳細については、ドキュメントをご覧ください。

Pub/Sub パブリッシャーのロールを Pub/Sub サービス アカウントに追加する

Pub/Sub が Cloud Storage インポート トピックにパブリッシュできるように、Pub/Sub サービス アカウントに Pub/Sub パブリッシャーのロールを割り当てる必要があります。

  • プロジェクト内のすべてのトピックへの公開を有効にするには、すべてのトピックへの公開を有効にするをご覧ください。Cloud Storage インポート トピックを作成していない場合は、この方法を使用します。

  • 特定のトピックへの公開を有効にするには、単一トピックへの公開を有効にするをご覧ください。この方法は、Cloud Storage インポート トピックがすでに存在する場合にのみ使用します。

すべての Cloud Storage インポート トピックへのパブリッシュを有効にする

プロジェクトで使用できる Cloud Storage インポート トピックがない場合は、このオプションを選択します。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    [IAM] に移動

  2. [Google 提供のロール付与を含む] オプションを有効にします。

  3. 次の形式の Pub/Sub サービス アカウントを探します。

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. Pub/Sub パブリッシャーのロールroles/pubsub.publisher)を検索して選択します。

  7. [保存] をクリックします。

単一の Cloud Storage インポート トピックへのパブリッシュを有効にする

すでに存在する特定の Cloud Storage インポート トピックにパブリッシュする権限を Pub/Sub に付与するには、次の操作を行います。

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics add-iam-policy-binding コマンドを実行します。

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    次のように置き換えます。

    • TOPIC_ID は、Cloud Storage インポート トピックの ID または名前です。

    • [PROJECT_NUMBER] はプロジェクト番号です。 プロジェクト番号を表示するには、プロジェクトを特定するをご覧ください。

Pub/Sub サービス アカウントに Cloud Storage のロールを割り当てる

Cloud Storage インポート トピックを作成するには、Pub/Sub サービス アカウントに、特定の Cloud Storage バケットからの読み取り権限が必要です。次の権限が必要です。

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

これらの権限を Pub/Sub サービス アカウントに割り当てるには、次のいずれかの方法を選択します。

  • バケット レベルで権限を付与します。特定の Cloud Storage バケットで、Storage Legacy Object Reader(roles/storage.legacyObjectReader)ロールと Storage Legacy Bucket Reader(roles/storage.legacyBucketReader)ロールを Pub/Sub サービス アカウントに付与します。

  • プロジェクト レベルでロールを付与する必要がある場合は、代わりに Cloud Storage バケットを含むプロジェクトでストレージ管理者(roles/storage.admin)のロールを付与できます。Pub/Sub サービス アカウントに このロールを付与します。

バケットの権限

次の手順で、バケットレベルで Storage Legacy Object Reader(roles/storage.legacyObjectReader)のロールと Storage Legacy Bucket Reader(roles/storage.legacyBucketReader)のロールを Pub/Sub サービス アカウントに付与します。

  1. Google Cloud コンソールで [Cloud Storage] ページに移動します。

    [Cloud Storage] に移動

  2. メッセージを読み取り、Cloud Storage インポート トピックにインポートする Cloud Storage バケットをクリックします。

    [バケットの詳細] ページが開きます。

  3. [バケットの詳細] ページで、[権限] タブをクリックします。

  4. [権限] > [プリンシパルによる表示] タブで、[アクセス権を付与] をクリックします。

    [アクセス権を付与] ページが開きます。

  5. [プリンシパルを追加] セクションに、Pub/Sub サービス アカウントの名前を入力します。

    サービス アカウントの形式は service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com です。 たとえば、PROJECT_NUMBER=112233445566 のプロジェクトの場合、サービス アカウントの形式は service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com です。

  6. [ロールを割り当て] > [ロールを選択] プルダウンで、「Object Reader」と入力し、[Storage Legacy Object Reader] ロールを選択します。

  7. [別のロールを追加] をクリックします。

  8. [ロールを選択] プルダウンで、Bucket Reader を入力し、[Storage Legacy Bucket Reader] のロールを選択します。

  9. [保存] をクリックします。

プロジェクトの権限

次の手順を行って、プロジェクト レベルでストレージ管理者(roles/storage.admin)ロールを付与します。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    IAM に移動

  2. [権限] > [プリンシパルによる表示] タブで、[アクセス権を付与] をクリックします。

    [アクセス権を付与] ページが開きます。

  3. [プリンシパルを追加] セクションに、Pub/Sub サービス アカウントの名前を入力します。

    サービス アカウントの形式は service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com です。 たとえば、PROJECT_NUMBER=112233445566 のプロジェクトの場合、サービス アカウントの形式は service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com です。

  4. [ロールを割り当て] > [ロールを選択] プルダウンで、「Storage Admin」と入力し、[ストレージ管理者] ロールを選択します。

  5. [保存] をクリックします。

Cloud Storage IAM の詳細については、Cloud Storage Identity and Access Management をご覧ください。

Cloud Storage インポート トピックのプロパティ

すべてのトピックに共通するプロパティの詳細については、トピックのプロパティをご覧ください。

バケット名

これは、Cloud Storage インポート トピックにパブリッシュされたデータを Pub/Sub が読み取る Cloud Storage バケットの名前です。

入力形式

Cloud Storage インポート トピックを作成するときに、取り込むオブジェクトの形式を テキストAvroPub/Sub Avro として指定できます。

  • テキスト。オブジェクトにはプレーンテキストのデータが格納されていると見なされます。この入力形式では、オブジェクトが最小オブジェクト作成時間を満たし、glob パターンの条件を満たしている限り、バケット内のすべてのオブジェクトを取り込むことを試みます。

    区切り文字。オブジェクトをメッセージに分割する区切り文字を指定することもできます。未設定の場合、デフォルトは改行文字(\n)になります。区切り文字は 1 文字にする必要があります。

  • Avro。オブジェクトは Apache Avro バイナリ形式です。有効な Apache Avro 形式ではないオブジェクトは取り込まれません。Avro に関する制限事項は次のとおりです。

    • Avro バージョン 1.1.0 と 1.2.0 はサポートされていません。
    • Avro ブロックの最大サイズは 16 MB です。
  • Pub/Sub Avro。オブジェクトは Apache Avro バイナリ形式で、Avro ファイル形式の Pub/Sub Cloud Storage サブスクリプションを使用して Cloud Storage に書き込まれたオブジェクトのスキーマと一致します。Pub/Sub Avro に関する重要なガイドラインは次のとおりです。

    • Avro レコードのデータフィールドは、生成された Pub/Sub メッセージのデータフィールドに入力するために使用されます。

    • Cloud Storage サブスクリプションに write_metadata オプションが指定されている場合、attributes フィールドの値は、生成された Pub/Sub メッセージの属性として入力されます。

    • Cloud Storage に書き込まれた元のメッセージで順序指定キーが指定されている場合、このフィールドは、生成された Pub/Sub メッセージに original_message_ordering_key という名前の属性として入力されます。

最短のオブジェクト作成時間

必要に応じて、Cloud Storage インポート トピックを作成するときに、最小オブジェクト作成時間を指定できます。このタイムスタンプ以降に作成されたオブジェクトのみが取り込まれます。このタイムスタンプは、YYYY-MM-DDThh:mm:ssZ などの形式で指定する必要があります。0001-01-01T00:00:00Z9999-12-31T23:59:59Z までの過去または未来の日付が有効です。

glob パターンを照合する

Cloud Storage インポート トピックを作成するときに、一致するグロブパターンを指定することもできます。このパターンに一致する名前のオブジェクトのみが取り込まれます。たとえば、接尾辞 .txt が付いたすべてのオブジェクトを取り込むには、glob パターンを **.txt として指定します。

glob パターンでサポートされている構文については、Cloud Storage のドキュメントをご覧ください。

Cloud Storage インポート トピックを作成する

次の手順を完了していることを確認してください。

トピックとサブスクリプションを別々に作成すると、連続して作成した場合でもデータが失われる可能性があります。定期購入なしでトピックが存在する期間は短いです。この間にトピックにデータが送信された場合、そのデータは失われます。まずトピックを作成し、サブスクリプションを作成してから、トピックをインポート トピックに変換することで、インポート プロセス中にメッセージが失われることがなくなります。

Cloud Storage インポート トピックを作成するには、次の操作を行います。

Console

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

    トピックの詳細ページが開きます。

  3. [トピック ID] フィールドに、Cloud Storage インポート トピックの ID を入力します。

    トピックの命名の詳細については、命名ガイドラインをご覧ください。

  4. [デフォルトのサブスクリプションを追加する] を選択します。

  5. [取り込みを有効にする] を選択します。

  6. 取り込み元として [Google Cloud Storage] を選択します。

  7. Cloud Storage バケットの場合は、[参照] をクリックします。

    [バケットの選択] ページが開きます。次のオプションのいずれかを選択します。

    • 適切なプロジェクトから既存のバケットを選択します。

    • 作成アイコンをクリックし、画面上の手順に沿って新しいバケットを作成します。バケットを作成したら、Cloud Storage インポート トピックのバケットを選択します。

  8. バケットを指定すると、Pub/Sub は Pub/Sub サービス アカウントのバケットに対する適切な権限を確認します。権限の問題がある場合は、次のようなメッセージが表示されます。

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    権限の問題が発生した場合は、[権限を設定] をクリックします。詳細については、Pub/Sub サービス アカウントに Cloud Storage 権限を付与するをご覧ください。

  9. [オブジェクト形式] で、[テキスト]、[Avro]、または [Pub/Sub Avro] を選択します。

    [テキスト] を選択した場合は、オブジェクトをメッセージに分割する区切り文字を必要に応じて指定できます。

    これらのオプションの詳細については、入力形式をご覧ください。

  10. 省略可。トピックに最小オブジェクト作成時間を指定できます。設定した場合、最小オブジェクト作成時間後に作成されたオブジェクトのみが取り込まれます。

    詳細については、最短のオブジェクト作成時間をご覧ください。

  11. Glob パターンを指定する必要があります。バケット内のすべてのオブジェクトを取り込むには、glob パターンとして ** を使用します。設定すると、指定されたパターンに一致するオブジェクトのみが取り込まれます。

    詳細については、glob パターンを照合するをご覧ください。

  12. 他のデフォルト設定はそのままにします。
  13. [トピックを作成] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics create コマンドを実行します。

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    このコマンドでは、TOPIC_ID--cloud-storage-ingestion-bucket フラグ、--cloud-storage-ingestion-input-format フラグのみが必須です。残りのフラグはオプションであり、省略できます。

    次のように置き換えます。

    • TOPIC_ID: トピックの名前または ID。

    • BUCKET_NAME: 既存のバケットの名前を指定します。例: prod_bucketバケット名にプロジェクト ID を含めてはいけません。バケットを作成するには、バケットを作成するをご覧ください。

    • INPUT_FORMAT: 取り込まれるオブジェクトの形式を指定します。値は textavropubsub_avro です。これらのオプションの詳細については、入力形式をご覧ください。

    • TEXT_DELIMITER: テキスト オブジェクトを Pub/Sub メッセージに分割する区切り文字を指定します。これは 1 文字にする必要があります。また、INPUT_FORMATtext の場合にのみ設定する必要があります。デフォルトは改行文字(\n)です。

      gcloud CLI を使用して区切り文字を指定する場合は、改行 \n などの特殊文字の処理に注意してください。区切り文字が正しく解釈されるように、形式 '\n' を使用します。引用符やエスケープなしで \n を使用すると、区切り文字が "n" になります。

    • MINIMUM_OBJECT_CREATE_TIME: オブジェクトが取り込まれるために作成された最小時間を指定します。UTC の YYYY-MM-DDThh:mm:ssZ 形式で指定します。例: 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z までの過去または未来の日付はすべて有効です。

    • MATCH_GLOB: オブジェクトを取り込むために一致させる glob パターンを指定します。gcloud CLI を使用している場合、* 文字を含むマッチ グロブでは、* 文字を \*\*.txt 形式でエスケープするか、マッチ グロブ全体を引用符 "**.txt" または '**.txt' で囲む必要があります。glob パターンでサポートされている構文については、 Cloud Storage のドキュメントをご覧ください。

Go

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に沿って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

問題が発生した場合は、Cloud Storage インポート トピックのトラブルシューティングをご覧ください。

Cloud Storage インポート トピックを編集する

Cloud Storage インポート トピックを編集して、プロパティを更新できます。

たとえば、取り込みを再開するには、バケットを変更するか、オブジェクトの最小作成時間を更新します。

Cloud Storage インポート トピックを編集する手順は次のとおりです。

Console

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. Cloud Storage インポート トピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. 変更するフィールドを更新します。

  5. [更新] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. インポート トピックの設定が失われないようにするには、トピックを更新するたびにすべての設定を含めてください。値を指定しない場合は、Pub/Sub によって設定が元のデフォルト値にリセットされます。

    次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    次のように置き換えます。

    • TOPIC_ID はトピック ID または名前です。このフィールドは更新できません。

    • BUCKET_NAME: 既存のバケットの名前を指定します。例: prod_bucketバケット名にプロジェクト ID を含めてはいけません。バケットを作成するには、バケットを作成するをご覧ください。

    • INPUT_FORMAT: 取り込まれるオブジェクトの形式を指定します。値は textavropubsub_avro です。これらのオプションの詳細については、 入力形式をご覧ください。

    • TEXT_DELIMITER: テキスト オブジェクトを Pub/Sub メッセージに分割する区切り文字を指定します。これは 1 文字にする必要があります。INPUT_FORMATtext の場合にのみ設定します。デフォルトは改行文字(\n)です。

      gcloud CLI を使用して区切り文字を指定する場合は、改行 \n などの特殊文字の処理に注意してください。形式 '\n' を使用して、区切り文字が正しく解釈されるようにします。引用符やエスケープなしで \n を使用すると、区切り文字が "n" になります。

    • MINIMUM_OBJECT_CREATE_TIME: オブジェクトが取り込まれるために作成された最小時間を指定します。UTC の YYYY-MM-DDThh:mm:ssZ 形式で指定します。例: 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z までの過去または未来の日付はすべて有効です。

    • MATCH_GLOB: オブジェクトを取り込むために一致させる glob パターンを指定します。gcloud CLI を使用している場合、* 文字を含むマッチ グロブでは、* 文字を \*\*.txt 形式でエスケープするか、マッチ グロブ全体を引用符 "**.txt" または '**.txt' で囲む必要があります。glob パターンでサポートされている構文については、 Cloud Storage のドキュメントをご覧ください。

Cloud Storage インポート トピックの割り当てと上限

インポート トピックのパブリッシャーのスループットは、トピックのパブリッシュ割り当てによってバインドされます。詳細については、Pub/Sub の割り当てと上限をご覧ください。

次のステップ