Amazon Managed Streaming for Apache Kafka(Amazon MSK)インポート トピックを使用すると、Amazon MSK から外部ソースとして Pub/Sub にデータを継続的に取り込むことができます。その後、Pub/Sub がサポートしている任意の宛先にデータをストリーミングできます。
このドキュメントでは、Amazon MSK インポート トピックの作成と管理方法について説明します。標準トピックを作成するには、標準トピックを作成するをご覧ください。
インポート トピックの詳細については、インポート トピックについてをご覧ください。
始める前に
Pub/Sub 公開プロセスの詳細を確認する。
次のようなAmazon MSK インポート トピックの管理に必要なロールと権限を構成します。
Google Cloud が外部ストリーミング サービスにアクセスできるように、Workload Identity 連携を設定します。
必要なロールと権限
Amazon MSK インポート トピックの作成と管理に必要な権限を取得するには、トピックまたはプロジェクトに対する Pub/Sub 編集者 (roles/pubsub.editor
)の IAM ロールを付与するように管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。
この事前定義ロールには、Amazon MSK インポート トピックの作成と管理に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
Amazon MSK インポート トピックを作成、管理するには、次の権限が必要です。
-
インポート トピックを作成する:
pubsub.topics.create
-
インポート トピックを削除する:
pubsub.topics.delete
-
インポート トピックを取得する:
pubsub.topics.get
-
インポート トピックを一覧表示する:
pubsub.topics.list
-
インポート トピックに公開する:
pubsub.topics.publish
-
インポート トピックを更新する:
pubsub.topics.update
-
インポート トピックの IAM ポリシーを取得する:
pubsub.topics.getIamPolicy
-
インポート トピックの IAM ポリシーを構成する:
pubsub.topics.setIamPolicy
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。
Amazon MSK にアクセスするためのフェデレーション ID を設定する
Workload Identity 連携を使用すると、 Google Cloud サービスが外部で実行されているワークロードにアクセスできるようになります。 Google CloudID 連携を使用すれば、他のクラウドのリソースにアクセスするために認証情報を維持したり、 Google Cloud に渡したりする必要がなくなります。代わりに、ワークロード自体の ID を使用して Google Cloud 認証を行い、リソースにアクセスできます。
Google Cloudにサービス アカウントを作成する
このステップの実行は任意です。すでにサービス アカウントがある場合は、新しいサービス アカウントを作成せずに、そのサービス アカウントをこの手順で使用できます。既存のサービス アカウントを使用している場合は、次のステップのためにサービス アカウントの一意の ID を記録するに進みます。
Amazon MSK インポート トピックの場合、Pub/Sub はサービス アカウントを ID として使用して、AWS からリソースにアクセスします。
前提条件、必要なロールと権限、命名ガイドラインなど、サービス アカウントの作成の詳細については、サービス アカウントを作成するをご覧ください。サービス アカウントの作成後、サービス アカウントが使用できるようになるまでに 60 秒以上かかる場合があります。この動作は、読み取りオペレーションが結果整合性に基づいているためです。新しいサービス アカウントが利用可能になるまで時間がかかることがあります。
サービス アカウントの一意の ID を記録する
AWS コンソールでロールを設定するには、サービス アカウントの一意の ID が必要です。
Google Cloud コンソールで、[サービス アカウント] の詳細ページに移動します。
作成したサービス アカウントまたは使用する予定のサービス アカウントをクリックします。
[サービス アカウントの詳細] ページで、一意の ID 番号を記録します。
この ID は、AWS コンソールでロールを設定するワークフローの一部として必要になります。
サービス アカウント トークン作成者のロールを Pub/Sub サービス アカウントに追加する
サービス アカウント トークン作成者のロール(roles/iam.serviceAccountTokenCreator
)を使用すると、プリンシパルはサービス アカウントに有効期間の短い認証情報を作成できます。これらのトークンまたは認証情報は、サービス アカウントの権限を借用するために使用されます。
サービス アカウントの権限借用の詳細については、サービス アカウントの権限借用をご覧ください。
この手順で Pub/Sub パブリッシャーのロール(roles/pubsub.publisher
)を追加することもできます。ロールの詳細、およびロールを追加する理由については、Pub/Sub パブリッシャーのロールを Pub/Sub サービス アカウントに追加するをご覧ください。
Google Cloud コンソールの [IAM] ページに移動します。
[Google 提供のロール付与を含める Google] チェックボックスをオンにします。
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
形式のサービス アカウントを探します。このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。
必要に応じて、[別のロールを追加] をクリックします。
サービス アカウント トークン作成者のロール(
roles/iam.serviceAccountTokenCreator
)を検索してクリックします。[保存] をクリックします。
AWS でポリシーを作成する
Pub/Sub が Amazon MSK からデータを取り込めるように、AWS で Pub/Sub を認証できるようにするには、AWS でポリシーが必要です。
- AWS でポリシーを作成する方法の詳細については、IAM ポリシーの作成をご覧ください。
AWS でポリシーを作成するには、次の手順を行います。
AWS Management Console にログインし、IAM Console を開きます。
[IAM] のコンソールのナビゲーション パネルで、[アクセス管理] > [ポリシー] をクリックします。
[ポリシーを作成] をクリックします。
[サービスをクリック] で [MSK] をクリックします。
[許可される操作] で、[読み取り] > [GetBootstrapBrokers] をクリックします。
このアクションにより、Pub/Sub が MSK クラスタへの接続に使用するブートストラップ ブローカーを取得する権限が付与されます。
[権限を追加] をクリックします。
[サービスを選択] で、[Apache Kafka APIs for MSK] をクリックします。
[許可されているアクション] で、以下を選択します。
[List] > [DescribeTopic]
このアクションにより、Pub/Sub 取り込みトピックが Amazon MSK Kafka トピックの詳細を取得する権限が付与されます。
[Read] > [ReadData]
このアクションにより、Amazon MSK Kafka トピックからデータを読み取る権限が付与されます。
[書き込み] > [接続]
このアクションにより、Amazon MSK Kafka クラスタへの接続と認証の権限が付与されます。
[リソース] で、ポリシーを特定のクラスタに制限する場合は(推奨)、クラスタ ARN を指定します。
[権限を追加] をクリックします。
[サービスの選択] で [STS] をクリックします。
[許可される操作] で、[書き込み] > [AssumeRoleWithWebIdentity] をクリックします。
このアクションにより、ID 連携を使用して Amazon MSK への認証を行う Pub/Sub の一時的なセキュリティ認証情報のセットを取得する権限が付与されます。
[次へ] をクリックします。
ポリシーの名前と説明を入力します。
[ポリシーの作成] をクリックします。
カスタム信頼ポリシーを使用して AWS でロールを作成する
Pub/Sub が AWS に対して認証を行い、Amazon MSK からデータを取り込めるようにするために、AWS でロールを作成する必要があります。
AWS Management Console にログインし、IAM Console を開きます。
[IAM] のコンソールのナビゲーション パネルで [ロール] をクリックします。
[ロールを作成] をクリックします。
[信頼できるエンティティの選択] で、[カスタム信頼ポリシー] をクリックします。
[カスタム信頼ポリシー] セクションに、以下を入力するか貼り付けます。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Federated": "accounts.google.com" }, "Action": "sts:AssumeRoleWithWebIdentity", "Condition": { "StringEquals": { "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>" } } } ] }
<SERVICE_ACCOUNT_UNIQUE_ID>
は、サービス アカウントの一意の ID を記録するで記録したサービス アカウントの一意の ID に置き換えます。[次へ] をクリックします。
[権限を追加] で、作成したカスタム ポリシーを検索してクリックします。
[次へ] をクリックします。
ロールの名前と説明を入力します。
[ロールを作成] をクリックします。
Pub/Sub プリンシパルに Pub/Sub パブリッシャーのロールを追加する
パブリッシングを有効にするには、Pub/Sub が Amazon MSK インポート トピックにパブリッシュできるように、Pub/Sub サービス アカウントにパブリッシャーのロールを割り当てる必要があります。
すべてのトピックからのパブリッシュを有効にする
Amazon MSK インポート トピックを作成していない場合は、この方法を使用します。
Google Cloud コンソールの [IAM] ページに移動します。
[Google 提供のロール付与を含める Google] チェックボックスをオンにします。
service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com
形式のサービス アカウントを探します。このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。
必要に応じて、[別のロールを追加] をクリックします。
Pub/Sub パブリッシャーのロール(
roles/pubsub.publisher
)を検索してクリックします。[保存] をクリックします。
単一トピックからのパブリッシュを有効にする
この方法は、Amazon MSK インポート トピックがすでに存在する場合にのみ使用してください。
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud pubsub topics add-iam-policy-binding
コマンドを実行します。gcloud pubsub topics add-iam-policy-binding TOPIC_ID \ --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \ --role="roles/pubsub.publisher"
次のように置き換えます。
TOPIC_ID
: Amazon MSK インポート トピックのトピック ID。PROJECT_NUMBER
: プロジェクト番号。プロジェクト番号を表示するには、プロジェクトを特定するをご覧ください。
サービス アカウントのユーザーロールをサービス アカウントに追加する
サービス アカウント ユーザーのロール(roles/iam.serviceAccountUser
)には、プリンシパルがサービス アカウントを Amazon MSK インポート トピックの取り込み設定に接続し、そのサービス アカウントを連携 ID に使用できるようにする権限 iam.serviceAccounts.actAs
が含まれています。
Google Cloud コンソールの [IAM] ページに移動します。
トピックの作成または更新の呼び出しを発行するプリンシパルに対して、[プリンシパルを編集] ボタンをクリックします。
必要に応じて、[別のロールを追加] をクリックします。
サービス アカウント ユーザーのロール(
roles/iam.serviceAccountUser
)を検索してクリックします。[保存] をクリックします。
Amazon MSK インポート トピックを使用する
新しいインポート トピックを作成することも、既存のトピックを編集することもできます。
考慮事項
トピックとサブスクリプションを別々に作成すると、連続して作成した場合でもデータが失われる可能性があります。定期購入なしでトピックが存在する期間は短いです。この間にトピックにデータが送信された場合、そのデータは失われます。まずトピックを作成し、サブスクリプションを作成してから、トピックをインポート トピックに変換することで、インポート プロセス中にメッセージが失われることがなくなります。
既存のインポート トピックの Kafka トピックを同じ名前で再作成する必要がある場合、Kafka トピックを削除して再作成するだけでは不十分です。この操作により、Pub/Sub のオッサフ管理が無効になり、データが失われる可能性があります。この問題を軽減する手順は次のとおりです。
- Pub/Sub インポート トピックを削除します。
- Kafka トピックを削除します。
- Kafka トピックを作成します。
- Pub/Sub インポート トピックを作成します。
Amazon MSK Kafka トピックのデータは、常に最も早いオフセットから読み取られます。
Amazon MSK インポート トピックを作成する
トピックに関連付けられたプロパティの詳細については、トピックのプロパティをご覧ください。
次の手順を完了していることを確認してください。
Amazon MSK インポート トピックを作成するには、次の操作を行います。
Console
Google Cloud コンソールの トピック ページに移動します。
[トピックを作成] をクリックします。
[トピック ID] フィールドに、Amazon MSK インポート トピックの ID を入力します。トピックの命名の詳細については、命名ガイドラインをご覧ください。
[デフォルトのサブスクリプションを追加する] を選択します。
[取り込みを有効にする] を選択します。
取り込みソースには、[Amazon MSK] を選択します。
次の詳細情報を入力します。
クラスタの ARN: Pub/Sub に取り込む Amazon MSK の ARN。ARN の形式は次のとおりです。
arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
。トピック: Pub/Sub に取り込む Amazon MSK Kafka トピックの名前。
AWS Role ARN: AWS ロールの ARN。ロールの ARN 形式は次のとおりです。
arn:aws:iam:${Account}:role/${RoleName}
。サービス アカウント: Google Cloudでサービス アカウントを作成するで作成したサービス アカウント。
[トピックを作成] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud pubsub topics create
コマンドを実行します。gcloud pubsub topics create TOPIC_ID \ --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \ --aws-msk-ingestion-topic MSK_TOPIC \ --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \ --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
次のように置き換えます。
TOPIC_ID
: Pub/Sub トピックの名前または ID。MSK_CLUSTER_ARN
: Pub/Sub に取り込む Amazon MSK クラスタの ARN。ARN の形式は次のとおりです。arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
。MSK_TOPIC
: Pub/Sub に取り込む Amazon MSK Kafka トピックの名前。MSK_ROLE_ARN
: AWS ロールの ARN。ロールの ARN 形式は次のとおりです。arn:aws:iam:${Account}:role/${RoleName}
。PUBSUB_SERVICE_ACCOUNT
: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。
Go
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
Java
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
Node.js
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
Python
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
C++
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
Node.js (TypeScript)
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に沿って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
ARN の詳細については、Amazon リソース名(ARN)と IAM 識別子をご覧ください。
問題が発生した場合は、Amazon MSK インポート トピックのトラブルシューティングをご覧ください。
Amazon MSK インポート トピックを編集する
Amazon MSK インポート トピックの取り込みデータソースの設定を編集する手順は次のとおりです。
Console
Google Cloud コンソールの トピック ページに移動します。
Amazon MSK インポート トピックをクリックします。
[トピックの詳細] ページで、[編集] をクリックします。
変更するフィールドを更新します。
[更新] をクリックします。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
次のサンプルに記載されているフラグをすべて指定して、
gcloud pubsub topics update
コマンドを実行します。gcloud pubsub topics update TOPIC_ID \ --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \ --aws-msk-ingestion-topic MSK_TOPIC \ --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \ --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
次のように置き換えます。
- TOPIC_ID: Pub/Sub トピックの名前または ID。
- MSK_CLUSTER_ARN: Pub/Sub に取り込む Amazon MSK クラスタの ARN。ARN の形式は
arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
です。 - MSK_TOPIC: Pub/Sub に取り込む Amazon MSK Kafka トピックの名前。
- MSK_ROLE_ARN: AWS ロールの ARN。ロールの ARN 形式は次のとおりです。
arn:aws:iam:${Account}:role/${RoleName}
。 - PUBSUB_SERVICE_ACCOUNT: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。
割り当てと上限
インポート トピックのパブリッシャーのスループットは、トピックのパブリッシュ割り当てによってバインドされます。詳細については、Pub/Sub の割り当てと上限をご覧ください。
次のステップ
トピックのサブスクリプションの種類を選択する。
トピックにメッセージを公開する方法を学習する。
gcloud CLI、REST API、またはクライアント ライブラリを使用して、トピックを作成または変更する。