Application Integration でサポートされているコネクタをご覧ください。

Apache Kafka トリガー

Apache Kafka トリガーは、Apache Kafka 接続に登録している Apache Kafka イベントに基づいて統合を呼び出すことができるコネクタ イベント トリガーです。

始める前に

Apache Kafka トリガーの新しい接続を作成または構成する場合は、プロジェクトに対する次の Identity and Access Management(IAM)ロールがあることを確認してください。

  • Connectors 管理者( roles/connectors.admin
  • ロールの付与については、アクセスの管理をご覧ください。

Apache Kafka トリガーに使用するサービス アカウントに、次の IAM ロールを付与します。

制約事項

Apache Kafka トリガーには次の制限があります。

  • イベント サブスクリプションで SASL_SSL セキュリティ プロトコルをサポートします。
  • イベント サブスクリプションで mTLS はサポートされていません。

Apache Kafka トリガーを追加する

統合に Apache Kafka トリガーを追加する手順は次のとおりです。

  1. Google Cloud コンソールで [Application Integration] ページに移動します。

    Application Integration に移動

  2. ナビゲーション メニューで [統合] をクリックします。

    [統合リスト] ページが開き、Google Cloud プロジェクトで使用可能なすべての統合が一覧表示されます。

  3. 既存の統合を選択するか、[統合の作成] をクリックして新しい統合を作成します。

    新しい統合を作成する場合:

    1. [統合の作成] ペインで名前と説明を入力します。
    2. 統合のリージョンを選択します。
    3. 統合用のサービス アカウントを選択します。統合のサービス アカウントの詳細は、統合ツールバーの [統合の概要] ペインでいつでも変更または更新できます。
    4. [作成] をクリックします。

    統合エディタで統合が開きます。

  4. 統合エディタのナビゲーション バーで、[トリガー] をクリックして、使用可能なトリガーのリストを表示します。
  5. 統合エディタで、[Apache Kafka トリガー] 要素をクリックして配置します。
  6. Apache Kafka トリガーを構成するには、Integration Connectors で使用可能な既存の Apache Kafka 接続を使用するか、インライン接続作成オプションを使用して新しい Apache Kafka 接続を作成します。

既存の接続を使用して Apache Kafka トリガーを構成する

Integration Connectors で既存の Apache Kafka 接続を使用して、Apache Kafka トリガーを構成できます。なお、Apache Kafka トリガーコネクタ イベント トリガーであるため、トリガーの構成にはイベント サブスクリプションを有効にした Apache Kafka 接続のみが使用できます。

新しい Apache Kafka 接続を使用して Apache Kafka トリガーを構成する方法については、新しい接続を使用して Apache Kafka トリガーを構成するを参照してください

既存の Apache Kafka 接続を使用して Apache Kafka トリガーを構成するには、次の手順を実行します。

  1. 統合エディタで [Apache Kafka トリガー] 要素をクリックして、トリガー構成ペインを開きます。
  2. [トリガーの構成] をクリックします。
  3. [Connector Event Trigger Editor] ページで、構成の詳細を次のとおりに入力します。
    1. リージョン: Apache Kafka 接続のリージョンを選択します。
    2. 接続: 使用する Apache Kafka 接続を選択します。

      Application Integration には、アクティブでイベント サブスクリプションが有効になっている Apache Kafka 接続のみが表示されます。

    3. トピック名: トピック名を入力します。
    4. コンシューマ グループ ID: 必要に応じて、Kafka コンシューマが属するコンシューマ グループの名前を指定するコンシューマ グループ ID を入力します。
    5. Partition:特定のパーティションにサブスクライブする場合は、パーティションを入力します。パーティションとは、トピックを複数の部分に分割し、各部分をクラスタ内の別のノードに保存する方法です。
    6. 初期オフセットオフセットリセット戦略を次のように選択します。
      • [最も早い] を選択すると、接続では、コンシューマ グループの存続期間より前に生成されたメッセージを含む、未消費のメッセージが使用されます。
      • [最も新しい] を選択すると、コンシューマ グループの作成後に生成されたメッセージが接続で使用されます。
    7. サービス アカウント:Apache Kafka トリガーに必要な IAM ロールを持つサービス アカウントを選択します。
  4. [完了] をクリックします。

新しい接続を使用して Apache Kafka トリガーを構成する

新しい Apache Kafka 接続を使用して Apache Kafka トリガーを構成するには、次の手順に従います。

  1. 統合エディタで [Apache Kafka trigger] 要素をクリックして、トリガー構成ペインを開きます。
  2. Apache Kafka トリガーの構成をクリックします。
  3. [リージョン] フィールドはスキップします。
  4. [接続] をクリックし、リストから [接続の作成] オプションを選択します。[接続を作成] ペインが表示されます。
  5. [ロケーション] ステップで、新しい Apache Kafka 接続のロケーションを次のように選択します。
    1. リージョン:リストからリージョンを選択します。
    2. [次へ] をクリックします。
  6. [接続の詳細] ステップで、新しい Apache Kafka 接続の詳細を入力します。
    1. コネクタのバージョン:リストから利用可能な Apache Kafka コネクタのバージョンを選択します。
    2. 接続名: Apache Kafka 接続の名前を入力します。
    3. (省略可)説明文: 接続の説明を入力します。
    4. (省略可)Cloud Logging を有効にする: 接続のすべてのログデータを保存するには、このチェックボックスをオンにします。
    5. サービス アカウント: Apache Kafka 接続に必要な IAM ロールを持つサービス アカウントを選択します。
    6. Apache Kafka 接続では、[イベントのサブスクリプション、エンティティ、アクションを有効にする] オプションがデフォルトで選択されています。
    7. 型検出スキーム: MessageOnly を選択します。
    8. レジストリ サービス: トピック スキーマの操作に使用される Schema Registry サービス。
    9. レジストリタイプ: 特定のトピックに指定されたスキーマのタイプ。
    10. レジストリ バージョン: 指定したトピックの RegistryUrl から読み取られたスキーマのバージョン。
    11. レジストリ ユーザー: RegistryUrl で指定されたサーバーを承認するユーザー名またはアクセスキーの値。
    12. レジストリ パスワード: RegistryUrl で指定されたサーバーで承認するパスワードと秘密鍵のキー値を含む Secret Manager シークレット。
    13. 必要に応じて、接続ノードの設定を構成します。

      • ノードの最小数: 接続ノードの最小数を入力します。
      • ノードの最大数: 接続ノードの最大数を入力します。

      ノードは、トランザクションを処理する接続の単位(またはレプリカ)です。1 つの接続でより多くのトランザクションを処理するには、より多くのノードが必要になります。逆に、より少ないトランザクションを処理するには、より少ないノードが必要になります。ノードがコネクタの料金に与える影響については、接続ノードの料金をご覧ください。値を入力しない場合は、デフォルトで最小ノード数は 2 に設定され(可用性を高めるため)、最大ノード数は 50 に設定されます。

    14. 必要に応じて、[+ ラベルを追加] をクリックして キーと値ペアの形式でラベルを接続に追加します。
    15. SSL を有効にする: このフィールドは SSL を有効にするかどうかを設定します。
    16. [次へ] をクリックします。
  7. [宛先] セクションに、接続するリモートホスト(バックエンド システム)の詳細を入力します。
    1. 宛先の種類: 宛先の種類を選択します。
      • リストからホストアドレスを選択して、宛先のホスト名または IP アドレスを指定します。
      • バックエンド システムへのプライベート接続を確立する場合は、リストからエンドポイント アタッチメントを選択し、次にエンドポイント アタッチメントリストから必要なエンドポイント アタッチメントを選択します。

      セキュリティをさらに強化してバックエンドシステムへのパブリック接続を確立する場合は、接続用の静的アウトバウンド IP アドレスの構成を検討してから、特定の静的 IP アドレスのみを許可リストに登録するようファイアウォール ルールを構成します。

      他の宛先を入力するには、[+ 宛先を追加] をクリックします。

    2. [次へ] をクリックします。
  8. [認証] セクションで、認証の詳細を入力します。
    1. [認証タイプ] を選択し、関連する詳細を入力します。

      Apache Kafka 接続でサポートされる認証タイプは次のとおりです。

      • ユーザー名とパスワード
        • ユーザー名: 接続に使用する Apache Cassandra ユーザー名。
        • パスワード: Apache Kafka ユーザー名に関連付けられているパスワードを含む Secret Manager のシークレット。
        • Auth Scheme: 認証に使用されるスキーム

          Apache Kafka 接続でサポートされる認証スキームは次のとおりです。

          • Plain
          • SCRAM-SHA-1
          • SCRAM-SHA-256
      • 利用不可

        匿名ログインを使用する場合は、[Not Available] を選択します。

    2. [次へ] をクリックします。
  9. デッドレター構成を入力します。デッドレターを構成すると、指定された Pub/Sub トピックに未処理のイベントが書き込まれます。次の詳細情報を入力します。
    1. デッドレター プロジェクト ID: デッドレター Pub/Sub トピックを構成した Google Cloud プロジェクト ID。
    2. デッドレター トピック: 未処理イベントの詳細を書き込む Pub/Sub トピック。
  10. [次へ] をクリックします。
  11. Review: 接続と認証の詳細を確認します。
  12. [作成] をクリックします。

トリガー出力

Apache Kafka トリガーがイベント サブスクリプションの構成を完了するまでに数分かかります。イベント サブスクリプションのステータスは、トリガー構成ペインの [Event Subscription details] で確認できます。

Apache Kafka トリガーは、次の状態を使用してイベント サブスクリプションのステータスを示します。

  • Creating: トリガーがイベント サブスクリプションに登録中であることを示します。
  • Active: トリガーがイベント サブスクリプションに正常に登録されたことを示します。
  • Error: 構成されたイベント サブスクリプションに問題があることを示します。

[Event Subscription details] セクションには、イベント サブスクリプションのステータスに加えて、接続リージョン、接続名、イベント サブスクリプション名の詳細が表示されます。

トリガー出力変数

イベントごとに、Apache Kafka トリガーは、ダウンストリーム タスクで使用できる ConnectorEventPayload 出力変数(JSON 形式)を生成します。出力変数には、Apache Kafka イベントからのペイロードが含まれます。次に例を示します。

  {
    "message": "{\"key\":\"18\",\"topic\":\"test\",\"value\":\"hello world\",\"partition\":0,\"offset\":12,\"headers\":\"{}\",\"timestamp\":1712755076824,\"serializedMessagesize\":14}",
    "contentType": "text/plain"
 }

イベントサブスクリプションの表示

Integration Connectors で接続に関連付けられているすべてのイベント サブスクリプションの表示と管理を行う手順は、次のとおりです。

  1. [Integration Connectors] > [接続] ページに移動します。

    [接続] ページに移動

  2. サブスクリプションを表示する接続をクリックします。
  3. [イベント サブスクリプション] タブをクリックします。

    ここでは、接続のすべてのサブスクリプションが表示されます。

Apache Kafka トリガーを編集

Apache Kafka トリガーを編集して、接続構成とイベントサブスクリプションの詳細を変更または更新できます。

Apache Kafka トリガーを編集するには、次の操作を行います。

  1. 統合エディタで [Apache Kafka trigger] 要素をクリックして、トリガー構成ペインを開きます。
  2. Apache Kafka トリガーの構成をクリックします。
  3. [Connector Event Trigger Editor] ページで、次の操作を行います。
    1. 構成済みのイベント サブスクリプションを保持するには、[保持] をクリックします。それ以外の場合は、[削除] をクリックします。
    2. 接続の構成とイベントサブスクリプションの詳細を必要に応じて更新します。
    3. [完了] をクリックします。
  4. 更新された接続とイベント サブスクリプションの詳細は、トリガー構成ペインの [Event Subscription details] で確認できます。