インポート トピックを作成する

インポート トピックを使用すると、外部ソースから Pub/Sub にデータを取り込むことができます。その後、Pub/Sub がサポートしている任意の宛先にデータをストリーミングできます。

Pub/Sub では、インポート トピックにデータを取り込むための外部ソースとして Amazon Kinesis Data Streams がサポートされています。

インポート トピックの概要

インポート トピックでは、プロパティとしてトピックの取り込みが有効になっています。これにより、インポート トピックでストリーミング データを取り込むことができます。トピックの取り込みを有効にするには、コンソール、Google Cloud CLI、REST 呼び出し、またはクライアント ライブラリを使用します。インポート トピックの管理の一環として、Google Cloud は取り込みパイプラインのモニタリングとスケーリングを提供します。

インポート トピックがない場合、データソースから Pub/Sub にデータをストリーミングするには、追加のサービスが必要になります。この追加サービスは、元のソースからデータを pull して Pub/Sub に公開します。追加サービスとしては、Apache Spark などのストリーミング エンジンやカスタム作成サービスなどがあります。また、このサービスの構成、デプロイ、実行、スケーリング、モニタリングも必要です。

インポート トピックに関する重要な情報を次に示します。

  • 標準のトピックと同様に、インポート トピックに手動で公開できます。

  • インポート トピックに接続できる取り込みソースは 1 つのみです。

ストリーミング データのトピックをインポートすることをおすすめします。ストリーミング データの取り込みではなく BigQuery へのバッチデータの取り込みを検討している場合は、BigQuery Data Transfer Service(BQ DTS)を試すことができます。Cloud Storage にデータを取り込む場合は、Storage Transfer Service(STS)がおすすめです。

始める前に

インポート トピックの管理に必要なロールと権限

インポート トピックの作成と管理に必要な権限を取得するには、トピックまたはプロジェクトに対する Pub/Sub 編集者(roles/pubsub.editor)IAM ロールを付与するように管理者に依頼してください。ロールの付与の詳細については、アクセス権の管理をご覧ください。

この事前定義ロールには、インポート トピックの作成と管理に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

インポート トピックの作成と管理には、次の権限が必要です。

  • インポート トピックを作成する: pubsub.topics.create
  • インポート トピックを削除する: pubsub.topics.delete
  • インポート トピックを取得する: pubsub.topics.get
  • インポート トピックを一覧表示する: pubsub.topics.list
  • インポート トピックに公開する: pubsub.topics.publish
  • インポート トピックを更新する: pubsub.topics.update
  • インポート トピックの IAM ポリシーを取得する: pubsub.topics.getIamPolicy
  • インポート トピックの IAM ポリシーを構成する: pubsub.topics.setIamPolicy

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

Kinesis Data Streams にアクセスするためのフェデレーション ID を設定する

Workload Identity 連携を使用すると、Google Cloud サービスが Google Cloud の外部で実行されているワークロードにアクセスできるようになります。ID 連携を使用すれば、他のクラウドのリソースにアクセスするために認証情報を維持したり、Google Cloud に渡したりする必要がなくなります。代わりに、ワークロード自体の ID を使用して Google Cloud に対する認証を行い、リソースにアクセスできます。

Google Cloud でサービス アカウントを作成する

このステップの実行は任意です。すでにサービス アカウントがある場合は、新しいサービス アカウントを作成せずに、そのサービス アカウントをこの手順で使用できます。既存のサービス アカウントを使用している場合は、次のステップのためにサービス アカウントの一意の ID を記録するに進みます。

インポート トピックの場合、Pub/Sub はサービス アカウントを ID として使用して、AWS からリソースにアクセスします。

前提条件、必要なロールと権限、命名ガイドラインなど、サービス アカウントの作成の詳細については、サービス アカウントを作成するをご覧ください。サービス アカウントの作成後、サービス アカウントが使用できるようになるまでに 60 秒以上かかる場合があります。この動作は、読み取りオペレーションが結果整合性に基づいているためです。新しいサービス アカウントが利用可能になるまで時間がかかることがあります。

サービス アカウントの一意の ID を記録する

AWS コンソールでロールを設定するには、サービス アカウントの一意の ID が必要です。

  1. Google Cloud コンソールで、[サービス アカウント] の詳細ページに移動します。

    [サービス アカウント] に移動

  2. 作成したサービス アカウントまたは使用する予定のサービス アカウントをクリックします。

  3. [サービス アカウントの詳細] ページで、一意の ID 番号を記録します。

    この ID は、カスタム信頼ポリシーを使用して AWS でロールを作成するセクションで必要になります。

サービス アカウント トークン作成者のロールを Pub/Sub サービス アカウントに追加する

サービス アカウント トークン作成者のロールroles/iam.serviceAccountTokenCreator)を使用すると、プリンシパルはサービス アカウントに有効期間の短い認証情報を作成できます。これらのトークンまたは認証情報は、サービス アカウントの権限を借用するために使用されます。

サービス アカウントの権限借用の詳細については、サービス アカウントの権限借用をご覧ください。

この手順で Pub/Sub パブリッシャーのロールroles/pubsub.publisher)を追加することもできます。ロールの詳細、およびロールを追加する理由については、Pub/Sub パブリッシャーのロールを Pub/Sub サービス アカウントに追加するをご覧ください。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    [IAM] に移動

  2. [Google 提供のロール付与を含む] オプションを有効にします。

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 形式のサービス アカウントを探します。

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. サービス アカウント トークン作成者ロールroles/iam.serviceAccountTokenCreator)を検索して選択します。

  7. [保存] をクリックします。

AWS でポリシーを作成する

Pub/Sub が AWS Kinesis データ ストリームからデータを取り込めるように、AWS で Pub/Sub を認証できるようにするには、AWS でポリシーが必要です。AWS ポリシーを作成する前に、Kinesis データ ストリームと登録済みのコンシューマを作成します。権限を特定のストリームに制限できるように、この方法をおすすめします。

  • AWS Kinesis データ ストリームの作成方法の詳細については、Kinesis データ ストリームをご覧ください。

  • コンシューマの登録に使用される AWS Kinesis データ ストリーム API の詳細については、RegisterStreamConsumer をご覧ください。

  • AWS でポリシーを作成する方法の詳細については、IAM ポリシーの作成をご覧ください。

AWS でポリシーを作成するには、次の手順を行います。

  1. AWS Management Console にログインし、IAM Console を開きます。

  2. [IAM] のコンソールのナビゲーション パネルで、[アクセス管理] > [ポリシー] をクリックします。

  3. [ポリシーの作成] をクリックします。

  4. [サービスを選択] で [Kinesis] を選択します。

  5. [許可されているアクション] で、以下を選択します。

    • [List] > [ListShards]。

      このアクションは、ストリーム内のシャードを一覧表示する権限を付与し、各シャードに関する情報を提供します。

    • [Read] > [SubscribeToShard]。

      このアクションにより、強化されたファンアウトを使用して特定のシャードをリッスンする権限が付与されます。

    • [Read] > [DescribeStreamConsumer]。

      このアクションは、登録済みのストリーム コンシューマの説明を取得する権限を付与します。

    これらの権限は、ストリームからの読み取りをカバーします。Pub/Sub では、ストリーミング SubscribeToShard API を使用した拡張ファンアウトを備えた Kinesis ストリームからの読み取りのみがサポートされます。

  6. [リソース] で、ポリシーを特定のストリームまたはコンシューマに制限する場合は(推奨)、consumer ARN および stream ARN を指定します。

  7. [権限を追加] をクリックします。

  8. [サービスの選択] に「STS」と入力して、選択します。

  9. [許可される操作] で、[書き込み] > [AssumeRoleWithWebIdentity] を選択します。

    このアクションにより、ID 連携を使用して Kinesis データ ストリームへの認証を行う Pub/Sub の一時的なセキュリティ認証情報のセットを取得する権限が付与されます。

  10. [次へ] をクリックします。

  11. ポリシーの名前と説明を入力します。

  12. [ポリシーの作成] をクリックします。

カスタム信頼ポリシーを使用して AWS でロールを作成する

Pub/Sub が AWS に対して認証を行い、Kinesis Data Streams からデータを取り込めるようにするために、AWS でロールを作成する必要があります。

カスタム信頼ポリシーを使用してロールを作成するには、次の手順を行います。

  1. AWS Management Console にログインし、IAM Console を開きます。

  2. [IAM] のコンソールのナビゲーション パネルで [ロール] をクリックします。

  3. [ロールを作成] をクリックします。

  4. [信頼できるエンティティの選択] で [カスタム信頼ポリシー] を選択します。

  5. [カスタム信頼ポリシー] セクションに、以下を入力するか貼り付けます。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    <SERVICE_ACCOUNT_UNIQUE_ID> は、サービス アカウントの一意の ID を記録するで記録したサービス アカウントの一意の ID に置き換えます。

  6. [次へ] をクリックします。

  7. [権限を追加] で、作成したカスタム ポリシーを検索して選択します。

  8. [次へ] をクリックします。

  9. ロールの名前と説明を入力します。

  10. [ロールを作成] をクリックします。

Pub/Sub パブリッシャーのロールを Pub/Sub サービス アカウントに追加する

Pub/Sub が AWS Kinesis Data Streams からインポート トピックに公開できるように、Pub/Sub サービス アカウントにパブリッシャーのロールを割り当てる必要があります。

すべてのトピックからのパブリッシュを有効にする

  1. Google Cloud コンソールの [IAM] ページに移動します。

    [IAM] に移動

  2. [Google 提供のロール付与を含む] オプションを有効にします。

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 形式のサービス アカウントを探します。

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. Pub/Sub パブリッシャーのロールroles/pubsub.publisher)を検索して選択します。

  7. [保存] をクリックします。

単一トピックからのパブリッシュを有効にする

特定のインポート トピックにのみ公開権限を付与するには、次の手順を行います。

  1. Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部で Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud pubsub topics add-iam-policy-binding コマンドを実行します。

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"
    

    次のように置き換えます。

    • TOPIC_ID は、インポート トピックのトピック ID です。

    • [PROJECT_NUMBER] はプロジェクト番号です。 プロジェクト番号を表示するには、プロジェクトを特定するをご覧ください。

サービス アカウントのユーザーロールをサービス アカウントに追加する

サービス アカウント ユーザーのロールroles/iam.serviceAccountUser)には、プリンシパルがサービス アカウントをインポート トピックの取り込み設定に接続し、そのサービス アカウントを連携 ID に使用できるようにする権限 iam.serviceAccounts.actAs が含まれています。

次の手順を行います。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    [IAM] に移動

  2. トピックの作成または更新の呼び出しを発行するプリンシパルに対して、[プリンシパルを編集] ボタンをクリックします。

  3. 必要に応じて、[別のロールを追加] をクリックします。

  4. サービス アカウント ユーザーロールroles/iam.serviceAccountUser)を検索して選択します。

  5. [保存] をクリックします。

インポート トピックを作成する

トピックに関連付けられたプロパティの詳細については、トピックのプロパティをご覧ください。

次の手順を完了していることを確認してください。

インポート トピックを作成する方法は次のとおりです。

コンソール

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに、インポート トピックの ID を入力します。

    トピックの命名の詳細については、命名ガイドラインをご覧ください。

  4. [デフォルトのサブスクリプションを追加する] を選択します。

  5. [取り込みを有効にする] を選択します。

  6. 取り込みソースには、[Amazon Kinesis Data Streams] を選択します。

  7. 次の詳細情報を入力します。

    • Kinesis Stream ARN: Pub/Sub に取り込む予定の Kinesis Data Stream の ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis Consumer ARN: AWS Kinesis Data Stream に登録されているコンシューマ リソースの ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS Role ARN: AWS ロールの ARN。ロールの ARN 形式は次のとおりです: arn:aws:iam:${Account}:role/${RoleName}

    • サービス アカウント: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。

  8. [トピックを作成] をクリックします。

gcloud

  1. Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部で Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud pubsub topics create コマンドを実行します。

    gcloud pubsub topics create TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    次のように置き換えます。

    • TOPIC_ID はトピック ID です。

    • KINESIS_STREAM_ARN は、Pub/Sub に取り込む予定の Kinesis Data Streams の ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN は、AWS Kinesis Data Streams に登録されているコンシューマ リソースの ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN は AWS ロールの ARN です。ロールの ARN 形式は次のとおりです: arn:aws:iam:${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT は、Google Cloud でサービス アカウントを作成するで作成したサービス アカウントです。

Go

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", t)
	return nil
}

Java

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に沿って設定を行ってください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

ARN の詳細については、Amazon リソース名(ARN)IAM 識別子をご覧ください。

問題が発生した場合は、インポート トピックのトラブルシューティングをご覧ください。

インポート トピックを編集する

インポート トピックの取り込みデータソースの設定を編集できます。次の手順を行います。

コンソール

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. インポート トピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. 変更するフィールドを更新します。

  5. [更新] をクリックします。

gcloud

  1. Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部で Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. 次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

      gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    次のように置き換えます。

    • TOPIC_ID はトピック ID です。このフィールドは更新できません。

    • KINESIS_STREAM_ARN は、Pub/Sub に取り込む予定の Kinesis Data Streams の ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN は、AWS Kinesis Data Streams に登録されているコンシューマ リソースの ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN は AWS ロールの ARN です。ロールの ARN 形式は次のとおりです: arn:aws:iam:${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT は、Google Cloud でサービス アカウントを作成するで作成したサービス アカウントです。

インポート トピックの割り当てと上限

インポート トピックのパブリッシャーのスループットは、トピックのパブリッシュ割り当てによってバインドされます。詳細については、Pub/Sub の割り当てと上限をご覧ください。

次のステップ