トピックの種類を変更する

インポート トピックは標準トピックに、あるいは標準トピックをインポート トピックに変換できます。

インポート トピックを標準トピックに変換する

インポート トピックを標準トピックに変換するには、取り込み設定をクリアします。次の手順を行います。

Console

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. インポート トピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] チェックボックスをオフにします。

  5. [更新] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID は、トピック ID に置き換えます。

標準トピックを Amazon Kinesis Data Streams インポート トピックに変換する

標準トピックを Amazon Kinesis Data Streams インポート トピックに変換するには、まず、すべての前提条件を満たしていることを確認します。

Console

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. インポート トピックに変換するトピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] オプションを選択します。

  5. 取り込みソースには、[Amazon Kinesis Data Streams] を選択します。

  6. 次の詳細情報を入力します。

    • Kinesis Stream ARN: Pub/Sub に取り込む予定の Kinesis Data Stream の ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis Consumer ARN: AWS Kinesis Data Stream に登録されているコンシューマ リソースの ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS Role ARN: AWS ロールの ARN。ロールの ARN 形式は次のとおりです: arn:aws:iam:${Account}:role/${RoleName}

    • サービス アカウント: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。

  7. [更新] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    次のように置き換えます。

    • TOPIC_ID はトピック ID または名前です。このフィールドは更新できません。

    • KINESIS_STREAM_ARN は、Pub/Sub に取り込む予定の Kinesis Data Streams の ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN は、AWS Kinesis Data Streams に登録されているコンシューマ リソースの ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN は AWS ロールの ARN です。ロールの ARN 形式は次のとおりです: arn:aws:iam:${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT は、Google Cloud でサービス アカウントを作成するで作成したサービス アカウントです。

Go

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に沿って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

ARN の詳細については、Amazon リソース名(ARN)IAM 識別子をご覧ください。

標準トピックを Cloud Storage インポート トピックに変換する

標準トピックを Cloud Storage インポート トピックに変換するには、まず、すべての前提条件を満たしていることを確認します。

Console

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. Cloud Storage インポート トピックに変換するトピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] オプションを選択します。

  5. 取り込み元として [Google Cloud Storage] を選択します。

  6. Cloud Storage バケットの場合は、[参照] をクリックします。

    [バケットの選択] ページが開きます。次のオプションのいずれかを選択します。

    • 適切なプロジェクトから既存のバケットを選択します。

    • 作成アイコンをクリックし、画面上の手順に沿って新しいバケットを作成します。バケットを作成したら、Cloud Storage インポート トピックのバケットを選択します。

  7. バケットを指定すると、Pub/Sub は Pub/Sub サービス アカウントのバケットに対する適切な権限を確認します。権限に問題がある場合は、権限に関連するエラー メッセージが表示されます。

    権限の問題が発生した場合は、[権限を設定] をクリックします。詳細については、 Pub/Sub サービス アカウントに Cloud Storage 権限を付与するをご覧ください。

  8. [オブジェクト形式] で、[テキスト]、[Avro]、または [Pub/Sub Avro] を選択します。

    [テキスト] を選択した場合は、オブジェクトをメッセージに分割する区切り文字を必要に応じて指定できます。

    これらのオプションの詳細については、入力形式をご覧ください。

  9. 省略可。トピックに最小オブジェクト作成時間を指定できます。設定した場合、最小オブジェクト作成時間後に作成されたオブジェクトのみが取り込まれます。

    詳細については、 最短のオブジェクト作成時間をご覧ください。

  10. Glob パターンを指定する必要があります。バケット内のすべてのオブジェクトを取り込むには、glob パターンとして ** を使用します。指定されたパターンに一致するオブジェクトのみが取り込まれます。

    詳細については、 glob パターンを照合するをご覧ください。

  11. 他のデフォルト設定はそのままにします。
  12. [トピックを更新] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. インポート トピックの設定が失われないようにするには、トピックを更新するたびにすべての設定を含めてください。値を指定しない場合は、Pub/Sub によって設定が元のデフォルト値にリセットされます。

    次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    次のように置き換えます。

    • TOPIC_ID はトピック ID または名前です。このフィールドは更新できません。

    • BUCKET_NAME: 既存のバケットの名前を指定します。例: prod_bucketバケット名にプロジェクト ID を含めてはいけません。バケットを作成するには、バケットを作成するをご覧ください。

    • INPUT_FORMAT: 取り込まれるオブジェクトの形式を指定します。値は textavropubsub_avro です。これらのオプションの詳細については、入力形式をご覧ください。

    • TEXT_DELIMITER: テキスト オブジェクトを Pub/Sub メッセージに分割する区切り文字を指定します。これは 1 文字にする必要があります。また、INPUT_FORMATtext の場合にのみ設定する必要があります。デフォルトは改行文字(\n)です。

      gcloud CLI を使用して区切り文字を指定する場合は、改行 \n などの特殊文字の処理に注意してください。形式 '\n' を使用して、区切り文字が正しく解釈されるようにします。引用符やエスケープなしで \n を使用すると、区切り文字が "n" になります。

    • MINIMUM_OBJECT_CREATE_TIME: オブジェクトが取り込まれるために作成された最小時間を指定します。UTC の YYYY-MM-DDThh:mm:ssZ 形式で指定します。例: 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z までの過去または未来の日付はすべて有効です。

    • MATCH_GLOB: オブジェクトを取り込むために一致させる glob パターンを指定します。gcloud CLI を使用している場合、* 文字を含むマッチ グロブでは、* 文字を \*\*.txt 形式でエスケープするか、マッチ グロブ全体を引用符 "**.txt" または '**.txt' で囲む必要があります。glob パターンでサポートされている構文については、 Cloud Storage のドキュメントをご覧ください。