Amazon Kinesis Data Streams インポート トピックを作成する

Amazon Kinesis Data Streams インポート トピックを使用すると、外部ソースとして Amazon Kinesis Data Streams から Pub/Sub にデータを継続的に取り込むことができます。その後、Pub/Sub がサポートしている任意の宛先にデータをストリーミングできます。

このドキュメントでは、Amazon Kinesis Data Streams インポート トピックを作成して管理する方法について説明します。標準トピックを作成するには、標準トピックを作成するをご覧ください。

始める前に

インポート トピックの管理に必要なロールと権限

Amazon Kinesis Data Streams インポート トピックの作成と管理に必要な権限を取得するには、トピックまたはプロジェクトに対する Pub/Sub 編集者(roles/pubsub.editor)IAM ロールを付与するように管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

この事前定義ロールには、Amazon Kinesis Data Streams インポート トピックの作成と管理に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Amazon Kinesis Data Streams インポート トピックを作成、管理するには、次の権限が必要です。

  • インポート トピックを作成する: pubsub.topics.create
  • インポート トピックを削除する: pubsub.topics.delete
  • インポート トピックを取得する: pubsub.topics.get
  • インポート トピックを一覧表示する: pubsub.topics.list
  • インポート トピックに公開する: pubsub.topics.publish
  • インポート トピックを更新する: pubsub.topics.update
  • インポート トピックの IAM ポリシーを取得する: pubsub.topics.getIamPolicy
  • インポート トピックの IAM ポリシーを構成する: pubsub.topics.setIamPolicy

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

Kinesis Data Streams にアクセスするためのフェデレーション ID を設定する

Workload Identity 連携を使用すると、Google Cloud サービスが Google Cloud の外部で実行されているワークロードにアクセスできるようになります。ID 連携を使用すれば、他のクラウドのリソースにアクセスするために認証情報を維持したり、Google Cloud に渡したりする必要がなくなります。代わりに、ワークロード自体の ID を使用して Google Cloud に対する認証を行い、リソースにアクセスできます。

Google Cloud でサービス アカウントを作成する

このステップの実行は任意です。すでにサービス アカウントがある場合は、新しいサービス アカウントを作成せずに、そのサービス アカウントをこの手順で使用できます。既存のサービス アカウントを使用している場合は、次のステップのためにサービス アカウントの一意の ID を記録するに進みます。

Amazon Kinesis Data Streams インポート トピックの場合、Pub/Sub はサービス アカウントを ID として使用して、AWS からリソースにアクセスします。

前提条件、必要なロールと権限、命名ガイドラインなど、サービス アカウントの作成の詳細については、サービス アカウントを作成するをご覧ください。サービス アカウントの作成後、サービス アカウントが使用できるようになるまでに 60 秒以上かかる場合があります。この動作は、読み取りオペレーションが結果整合性に基づいているためです。新しいサービス アカウントが利用可能になるまで時間がかかることがあります。

サービス アカウントの一意の ID を記録する

AWS コンソールでロールを設定するには、サービス アカウントの一意の ID が必要です。

  1. Google Cloud コンソールで、[サービス アカウント] の詳細ページに移動します。

    [サービス アカウント] に移動

  2. 作成したサービス アカウントまたは使用する予定のサービス アカウントをクリックします。

  3. [サービス アカウントの詳細] ページで、一意の ID 番号を記録します。

    この ID は、カスタム信頼ポリシーを使用して AWS でロールを作成するセクションで必要になります。

サービス アカウント トークン作成者のロールを Pub/Sub サービス アカウントに追加する

サービス アカウント トークン作成者のロールroles/iam.serviceAccountTokenCreator)を使用すると、プリンシパルはサービス アカウントに有効期間の短い認証情報を作成できます。これらのトークンまたは認証情報は、サービス アカウントの権限を借用するために使用されます。

サービス アカウントの権限借用の詳細については、サービス アカウントの権限借用をご覧ください。

この手順で Pub/Sub パブリッシャーのロールroles/pubsub.publisher)を追加することもできます。ロールの詳細、およびロールを追加する理由については、Pub/Sub パブリッシャーのロールを Pub/Sub サービス アカウントに追加するをご覧ください。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    [IAM] に移動

  2. [Google 提供のロール付与を含む] オプションを有効にします。

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 形式のサービス アカウントを探します。

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. サービス アカウント トークン作成者ロールroles/iam.serviceAccountTokenCreator)を検索して選択します。

  7. [保存] をクリックします。

AWS でポリシーを作成する

Pub/Sub が AWS Kinesis データ ストリームからデータを取り込めるように、AWS で Pub/Sub を認証できるようにするには、AWS でポリシーが必要です。AWS ポリシーを作成する前に、Kinesis データ ストリームと登録済みのコンシューマを作成します。権限を特定のストリームに制限できるように、この方法をおすすめします。

  • AWS Kinesis データ ストリームの作成方法の詳細については、Kinesis データ ストリームをご覧ください。

  • コンシューマの登録に使用される AWS Kinesis データ ストリーム API の詳細については、RegisterStreamConsumer をご覧ください。

  • AWS でポリシーを作成する方法の詳細については、IAM ポリシーの作成をご覧ください。

AWS でポリシーを作成するには、次の手順を行います。

  1. AWS Management Console にログインし、IAM Console を開きます。

  2. [IAM] のコンソールのナビゲーション パネルで、[アクセス管理] > [ポリシー] をクリックします。

  3. [ポリシーの作成] をクリックします。

  4. [サービスを選択] で [Kinesis] を選択します。

  5. [許可されているアクション] で、以下を選択します。

    • [List] > [ListShards]。

      このアクションは、ストリーム内のシャードを一覧表示する権限を付与し、各シャードに関する情報を提供します。

    • [Read] > [SubscribeToShard]。

      このアクションにより、強化されたファンアウトを使用して特定のシャードをリッスンする権限が付与されます。

    • [Read] > [DescribeStreamConsumer]。

      このアクションは、登録済みのストリーム コンシューマの説明を取得する権限を付与します。

    これらの権限は、ストリームからの読み取りをカバーします。Pub/Sub では、ストリーミング SubscribeToShard API を使用した拡張ファンアウトを備えた Kinesis ストリームからの読み取りのみがサポートされます。

  6. [リソース] で、ポリシーを特定のストリームまたはコンシューマに制限する場合は(推奨)、consumer ARN および stream ARN を指定します。

  7. [権限を追加] をクリックします。

  8. [サービスの選択] に「STS」と入力して、選択します。

  9. [許可される操作] で、[書き込み] > [AssumeRoleWithWebIdentity] を選択します。

    このアクションにより、ID 連携を使用して Kinesis データ ストリームへの認証を行う Pub/Sub の一時的なセキュリティ認証情報のセットを取得する権限が付与されます。

  10. [次へ] をクリックします。

  11. ポリシーの名前と説明を入力します。

  12. [ポリシーの作成] をクリックします。

カスタム信頼ポリシーを使用して AWS でロールを作成する

Pub/Sub が AWS に対して認証を行い、Kinesis Data Streams からデータを取り込めるようにするために、AWS でロールを作成する必要があります。

カスタム信頼ポリシーを使用してロールを作成するには、次の手順を行います。

  1. AWS Management Console にログインし、IAM Console を開きます。

  2. [IAM] のコンソールのナビゲーション パネルで [ロール] をクリックします。

  3. [ロールを作成] をクリックします。

  4. [信頼できるエンティティの選択] で [カスタム信頼ポリシー] を選択します。

  5. [カスタム信頼ポリシー] セクションに、以下を入力するか貼り付けます。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    <SERVICE_ACCOUNT_UNIQUE_ID> は、サービス アカウントの一意の ID を記録するで記録したサービス アカウントの一意の ID に置き換えます。

  6. [次へ] をクリックします。

  7. [権限を追加] で、作成したカスタム ポリシーを検索して選択します。

  8. [次へ] をクリックします。

  9. ロールの名前と説明を入力します。

  10. [ロールを作成] をクリックします。

Pub/Sub パブリッシャーのロールを Pub/Sub サービス アカウントに追加する

Pub/Sub が AWS Kinesis Data Streams からインポート トピックに公開できるように、Pub/Sub サービス アカウントにパブリッシャーのロールを割り当てる必要があります。

  • プロジェクト内のすべてのトピックからの公開を有効にするには、すべてのトピックからの公開を有効にするをご覧ください。この方法は、AWS Kinesis Data Streams インポート トピックを作成していない場合に使用します。

  • 特定のトピックからの公開を有効にする(推奨)には、単一トピックからの公開を有効にするをご覧ください。この方法は、AWS Kinesis Data Streams インポート トピックがすでに存在する場合にのみ使用してください。

すべてのトピックからのパブリッシュを有効にする

この方法は、AWS Kinesis Data Streams インポート トピックを作成していない場合に使用します。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    [IAM] に移動

  2. [Google 提供のロール付与を含む] オプションを有効にします。

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 形式のサービス アカウントを探します。

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. Pub/Sub パブリッシャーのロールroles/pubsub.publisher)を検索して選択します。

  7. [保存] をクリックします。

単一トピックからのパブリッシュを有効にする

この方法は、AWS Kinesis Data Streams インポート トピックがすでに存在する場合にのみ使用してください。

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics add-iam-policy-binding コマンドを実行します。

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    次のように置き換えます。

    • TOPIC_ID は、Amazon Kinesis Data Streams インポート トピックのトピック ID です。

    • [PROJECT_NUMBER] はプロジェクト番号です。 プロジェクト番号を表示するには、プロジェクトを特定するをご覧ください。

サービス アカウントのユーザーロールをサービス アカウントに追加する

サービス アカウント ユーザーのロールroles/iam.serviceAccountUser)には、プリンシパルがサービス アカウントを Amazon Kinesis Data Streams インポート トピックの取り込み設定に接続し、そのサービス アカウントを連携 ID に使用できるようにする権限 iam.serviceAccounts.actAs が含まれています。

次の手順を行います。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    [IAM] に移動

  2. トピックの作成または更新の呼び出しを発行するプリンシパルに対して、[プリンシパルを編集] ボタンをクリックします。

  3. 必要に応じて、[別のロールを追加] をクリックします。

  4. サービス アカウント ユーザーロールroles/iam.serviceAccountUser)を検索して選択します。

  5. [保存] をクリックします。

Amazon Kinesis Data Streams インポート トピックを作成する

トピックに関連付けられたプロパティの詳細については、トピックのプロパティをご覧ください。

次の手順を完了していることを確認してください。

トピックとサブスクリプションを別々に作成すると、連続して作成した場合でもデータが失われる可能性があります。定期購入なしでトピックが存在する期間は短いです。この間にトピックにデータが送信された場合、そのデータは失われます。まずトピックを作成し、サブスクリプションを作成してから、トピックをインポート トピックに変換することで、インポート プロセス中にメッセージが失われることがなくなります。

Amazon Kinesis Data Streams インポート トピックを作成するには、次の操作を行います。

Console

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに、Amazon Kinesis Data Streams インポート トピックの ID を入力します。

    トピックの命名の詳細については、命名ガイドラインをご覧ください。

  4. [デフォルトのサブスクリプションを追加する] を選択します。

  5. [取り込みを有効にする] を選択します。

  6. 取り込みソースには、[Amazon Kinesis Data Streams] を選択します。

  7. 次の詳細情報を入力します。

    • Kinesis Stream ARN: Pub/Sub に取り込む予定の Kinesis Data Stream の ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis Consumer ARN: AWS Kinesis Data Stream に登録されているコンシューマ リソースの ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS Role ARN: AWS ロールの ARN。ロールの ARN 形式は次のとおりです: arn:aws:iam:${Account}:role/${RoleName}

    • サービス アカウント: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。

  8. [トピックを作成] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics create コマンドを実行します。

    gcloud pubsub topics create TOPIC_ID \
           --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
           --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
           --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
           --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    次のように置き換えます。

    • TOPIC_ID はトピック ID です。

    • KINESIS_STREAM_ARN は、Pub/Sub に取り込む予定の Kinesis Data Streams の ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN は、AWS Kinesis Data Streams に登録されているコンシューマ リソースの ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN は AWS ロールの ARN です。ロールの ARN 形式は次のとおりです: arn:aws:iam:${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT は、Google Cloud でサービス アカウントを作成するで作成したサービス アカウントです。

Go

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", t)
	return nil
}

Java

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に沿って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

ARN の詳細については、Amazon リソース名(ARN)IAM 識別子をご覧ください。

問題が発生した場合は、Amazon Kinesis Data Streams インポート トピックのトラブルシューティングをご覧ください。

Amazon Kinesis Data Streams インポート トピックを編集する

Amazon Kinesis Data Streams インポート トピックの取り込みデータソースの設定を編集できます。次の手順を行います。

Console

  1. Google Cloud コンソールの トピック ページに移動します。

    [トピック] に移動

  2. Amazon Kinesis Data Streams インポート トピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. 変更するフィールドを更新します。

  5. [更新] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. インポート トピックの設定が失われないようにするには、トピックを更新するたびにすべての設定を含めてください。値を指定しない場合は、Pub/Sub によって設定が元のデフォルト値にリセットされます。

    次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

      gcloud pubsub topics update TOPIC_ID \
             --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
             --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
             --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
             --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    次のように置き換えます。

    • TOPIC_ID はトピック ID です。このフィールドは更新できません。

    • KINESIS_STREAM_ARN は、Pub/Sub に取り込む予定の Kinesis Data Streams の ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN は、AWS Kinesis Data Streams に登録されているコンシューマ リソースの ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN は AWS ロールの ARN です。ロールの ARN 形式は次のとおりです: arn:aws:iam:${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT は、Google Cloud でサービス アカウントを作成するで作成したサービス アカウントです。

Amazon Kinesis Data Streams インポート トピックの割り当てと上限

インポート トピックのパブリッシャーのスループットは、トピックのパブリッシュ割り当てによってバインドされます。詳細については、Pub/Sub の割り当てと上限をご覧ください。

次のステップ