Apache Kafka

Apache Kafka コネクタを使用すると、Apache Kafka データベースに対して、挿入、削除、更新、読み取りオペレーションを実行できます。

サポート対象のバージョン

Apache Kafka コネクタは、ネイティブ クライアント ライブラリを利用して特定の Kafka クラスタへの接続を確立します。このコネクタは、クライアント バージョン 3.3.1 で動作します。ただし、このコネクタはバージョン 3.0~3.3.1 の Kafka クラスタへの接続を確立できます。

始める前に

Apache Kafka コネクタを使用する前に、次の作業を行います。

  • Google Cloud プロジェクトで次の操作を行います。
    • コネクタを構成するユーザーに roles/connectors.admin IAM ロールを付与します。
    • コネクタに使用するサービス アカウントに、次の IAM ロールを付与します。
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor

      サービス アカウントは特別なタイプの Google アカウントで、Google API のデータにアクセスするのに認証を受ける必要がある人間以外のユーザーを表します。サービス アカウントがない場合は、サービス アカウントを作成する必要があります。詳細については、サービス アカウントを作成するをご覧ください。

    • 次のサービスを有効にします。
      • secretmanager.googleapis.com(Secret Manager API)
      • connectors.googleapis.com(Connectors API)

      サービスを有効にする方法については、サービスを有効にするをご覧ください。

    以前にプロジェクトでこうしたサービスを有効にしていない場合は、コネクタを構成するときにそれを有効にすることを求められます。

コネクタを構成する

コネクタを構成するには、データソース(バックエンド システム)への接続を作成する必要があります。接続はデータソースに特有です。つまり、多数のデータソースがある場合は、データソースごとに別々の接続を作成する必要があります。接続を作成する手順は次のとおりです。

  1. Cloud コンソールで、[Integration Connectors] > [接続] ページに移動し、Google Cloud プロジェクトを選択または作成します。

    [接続] ページに移動

  2. [+ 新規作成] をクリックして [接続の作成] ページを開きます。
  3. [ロケーション] ステップで、新しい Apache Kafka 接続のロケーションを次のように選択します。
    1. リージョン: リストからリージョンを選択します。
    2. [次へ] をクリックします。
  4. [接続の詳細] ステップで、新しい Apache Kafka 接続の詳細を入力します。
    1. コネクタのバージョン: リストから、使用可能な Apache Kafka コネクタのバージョンを選択します。
    2. 接続名: Apache Kafka 接続の名前を入力します。
    3. (省略可)説明文: 接続の説明を入力します。
    4. (省略可)Cloud Logging を有効にする: 接続のすべてのログデータを保存するには、このチェックボックスをオンにします。
    5. サービス アカウント: Apache Kafka 接続に必要な IAM ロールを持つサービス アカウントを選択します。
    6. Apache Kafka 接続では、[イベントのサブスクリプション、エンティティ、アクションを有効にする] オプションがデフォルトで選択されています。
    7. タイプ検出スキーム: MessageOnly を選択します。
    8. レジストリ サービス: トピック スキーマの操作に使用される Schema Registry サービス。
    9. レジストリタイプ: 特定のトピックに指定されたスキーマのタイプ。
    10. レジストリ バージョン: 指定したトピックの RegistryUrl から読み取られたスキーマのバージョン。
    11. レジストリ ユーザー: RegistryUrl で指定されたサーバーを承認するユーザー名またはアクセスキーの値。
    12. レジストリ パスワード: RegistryUrl で指定されたサーバーで承認するパスワードと秘密鍵のキー値を含む Secret Manager シークレット。
    13. 必要に応じて、接続ノードの設定を構成します。

      • ノードの最小数: 接続ノードの最小数を入力します。
      • ノードの最大数: 接続ノードの最大数を入力します。

      ノードは、トランザクションを処理する接続の単位(またはレプリカ)です。1 つの接続でより多くのトランザクションを処理するには、より多くのノードが必要になります。逆に、より少ないトランザクションを処理するには、より少ないノードが必要になります。ノードがコネクタの料金に与える影響については、接続ノードの料金をご覧ください。値を入力しない場合は、デフォルトで最小ノード数は 2 に設定され(可用性を高めるため)、最大ノード数は 50 に設定されます。

    14. 必要に応じて、[+ ラベルを追加] をクリックして キーと値ペアの形式でラベルを接続に追加します。
    15. SSL を有効にする: このフィールドは SSL を有効にするかどうかを設定します。
    16. [次へ] をクリックします。
  5. [宛先] セクションに、接続するリモートホスト(バックエンド システム)の詳細を入力します。
    1. 宛先の種類: 宛先の種類を選択します。
      • リストから [ホストアドレス] を選択し、宛先のホスト名または IP アドレスを指定します。
      • バックエンド システムへのプライベート接続を確立する場合は、リストからエンドポイント アタッチメントを選択し、次にエンドポイント アタッチメントリストから必要なエンドポイント アタッチメントを選択します。

      セキュリティをさらに強化してバックエンドシステムへのパブリック接続を確立する場合は、接続用の静的アウトバウンド IP アドレスの構成を検討してから、特定の静的 IP アドレスのみを許可リストに登録するようファイアウォール ルールを構成します。

      他の宛先を入力するには、[+ 宛先を追加] をクリックします。

    2. [次へ] をクリックします。
  6. [認証] セクションで、認証の詳細を入力します。
    1. [認証タイプ] を選択し、関連する詳細を入力します。

      Apache Kafka 接続でサポートされる認証タイプは次のとおりです。

      • ユーザー名とパスワード
        • ユーザー名: 接続に使用する Apache Cassandra ユーザー名。
        • パスワード: Apache Kafka ユーザー名に関連付けられているパスワードを含む Secret Manager のシークレット。
        • Auth Scheme: 認証に使用されるスキーム

          Apache Kafka 接続でサポートされる認証スキームは次のとおりです。

          • Plain
          • SCRAM-SHA-1
          • SCRAM-SHA-256
      • 利用不可

        匿名ログインを使用する場合は、[Not Available] を選択します。

    2. [次へ] をクリックします。
  7. デッドレター構成を入力します。デッドレターを構成すると、指定された Pub/Sub トピックに未処理のイベントが書き込まれます。次の詳細情報を入力します。
    1. デッドレター プロジェクト ID: デッドレター Pub/Sub トピックを構成した Google Cloud プロジェクト ID。
    2. デッドレター トピック: 未処理イベントの詳細を書き込む Pub/Sub トピック。
  8. [次へ] をクリックします。
  9. Review: 接続と認証の詳細を確認します。
  10. [作成] をクリックします。

エンティティ、オペレーション、アクション

すべての Integration Connectors が、接続されたアプリケーションのオブジェクトを抽象化するレイヤを提供します。アプリケーションのオブジェクトには、この抽象化を通じてのみアクセスできます。抽象化は、エンティティ、オペレーション、アクションとして公開されます。

  • エンティティ: エンティティは、接続されているアプリケーションやサービスのオブジェクト、またはプロパティのコレクションと考えることができます。エンティティの定義は、コネクタによって異なります。たとえば、データベース コネクタでは、テーブルがエンティティであり、ファイル サーバー コネクタでは、フォルダがエンティティです。また、メッセージング システム コネクタでは、キューがエンティティです。

    ただし、コネクタでいずれのエンティティもサポートされていない、またはエンティティが存在しない可能性があります。その場合、Entities リストは空になります。

  • オペレーション: エンティティに対して行うことができるアクティビティです。エンティティに対して次のいずれかのオペレーションを行うことができます。

    使用可能なリストからエンティティを選択すると、そのエンティティで使用可能なオペレーションのリストが生成されます。オペレーションの詳細については、コネクタタスクのエンティティ オペレーションをご覧ください。ただし、コネクタがいずれかのエンティティ オペレーションをサポートしていない場合、サポートされていないオペレーションは Operations リストに含まれません。

  • アクション: コネクタ インターフェースを介して統合で使用できる主要な関数の一つです。アクションを使用すると、1 つまたは複数のエンティティに対して変更を加えることができます。また、使用できるアクションはコネクタごとに異なります。通常、アクションには入力パラメータと出力パラメータがあります。ただし、コネクタがどのアクションもサポートしていない可能性があります。その場合は、Actions リストが空になります。

システムの上限

Apache コネクタは、ノードごとに 1 秒あたり最大 50 件のトランザクションを処理することができ、この上限を超えるトランザクションはすべてthrottlesされます。デフォルトでは、Integration Connectors は、接続に 2 つのノードを割り当てます(可用性を高めるため)。

Integration Connectors に適用される上限の詳細については、上限をご覧ください。

操作

PublishMessage アクション

このアクションは、Apache Kafka トピックにメッセージをパブリッシュします。次の表に、PublishMessage アクションの入出力パラメータを示します。

入力パラメータ

パラメータ名 必須 データ型 説明
トピック 文字列 メッセージをパブリッシュするトピックの名前。
パーティション × 文字列 メッセージが割り当てられているパーティション。値は、指定されたトピックに対して有効である必要があります。この値を設定しない場合、ネイティブ クライアントによって自動的に設定されます。
キー × 文字列 メッセージキー。
メッセージ 文字列 パブリッシュするメッセージ。メッセージは文字列型 JSON である必要があり、サポートされている最大メッセージ サイズは 10 MB です。
HasBytes × ブール値 メッセージがバイナリ形式かどうかを指定します。
MessageBytes × 文字列 Base64 でエンコードされた文字列形式のメッセージ。
検証 × ブール値 パブリッシュするメッセージを、トピックのスキーマ レジストリで定義されたメッセージ スキーマと照らし合わせて検証する必要があるかどうかを指定します。接続の作成時にスキーマ レジストリを指定した場合、その後、レジストリからのトピックのスキーマ定義が検証の目的で使用されます。このフィールドのデフォルト値は false です。

出力パラメータ

パラメータ名 データ型 説明
PartitionWritten 整数 メッセージが書き込まれたパーティション。
OffsetWritten 長い メッセージが書き込まれたパーティション内の位置。
TimestampWritten 長い メッセージがパーティションに commit された時刻(Unix タイムスタンプ)。
KeyWritten 文字列 書き込まれたメッセージ キーの値。メッセージの書き込み時にメッセージ キーが指定されていない場合、値は NULL になります。
完了 ブール値 メッセージが公開されたかどうかを指定します。

PublishMessage アクションのレスポンスの例を次に示します。

{Success: true,
PartitionWritten: 1,
OffsetWritten: 22301,
KeyWritten: "dGVzdA==",
TimestampWritten: 1690806748}

Confluent Cloud の構成

Confluent Cloud の構成は、以前に Apache Kafka のために記載した手順とは少し異なります。Confluent Cloud 用の接続の作成時には、次の点を考慮してください。

  • Confluent Cloud クラスタ API キーはユーザー名として使用され、キーの Secret Manager の Secret は、ブートストラップ サーバーに接続するためのパスワードとして使用されます。API キーをまだ作成していない場合は、Confluent Cloud でそれを作成する必要があります。
  • [Connection Details] セクションで [SSL を使用する] を選択します。
  • スキーマ レジストリを使用している場合は、次の値を構成します。
    • Connection Details セクションで次の操作を行います。
      • レジストリ バージョン: レジストリのバージョン番号を入力します。最新バージョンを使用する場合は、latest と入力します。
      • レジストリ ユーザー: スキーマ レジストリ API キーを入力します。スキーマ レジストリ API キーがまだない場合は、それを作成する必要があります。
      • レジストリ パスワード: レジストリ パスワードの Secret Manager の Secret を入力します。
      • Secret のバージョン: Secret のバージョン番号を選択します。
      • レジストリ タイプ: Confluent を選択します。
      • タイプ検出スキーム: MessageOnly を選択します。
    • Destinations セクションのホスト名フィールドにレジストリ URL を入力します。

    Terraform を使用して接続を作成する

    Terraform リソースを使用して、新しい接続を作成できます。

    Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

    接続作成用の Terraform テンプレートのサンプルを表示するには、サンプル テンプレートをご覧ください。

    Terraform を使用してこの接続を作成する場合は、Terraform 構成ファイルで次の変数を設定する必要があります。

    パラメータ名 データ型 必須 説明
    type_detection_scheme ENUM True Apache Kafka ブローカーでの認証に使用されるスキーム。サポートされている値は MessageOnly です。
    registry_service ENUM False トピック スキーマの操作に使用される Schema Registry サービス。サポートされている値は Confluent です。
    registry_type ENUM False 特定のトピックに指定されたスキーマのタイプ。サポートされている値は、AVRO、JSON です。
    registry_version STRING False 指定したトピックの RegistryUrl から読み取られたスキーマのバージョン。レジストリ バージョンの有効な値は [1,2^31-1] または文字列「latest」です。これは、最後に登録されたスキーマを返します。
    registry_user STRING False RegistryUrl で指定されたサーバーを承認するユーザー名。
    registry_password SECRET False RegistryUrl で指定されたサーバーで承認するパスワードと秘密鍵のキー値を含む Secret Manager の Secret。
    usessl BOOLEAN False このフィールドは SSL を有効にするかどうかを設定します。

    統合で Apache Kafka 接続を使用する

    接続を作成すると、Apigee Integration と Application Integration の両方で使用できるようになります。この接続は、コネクタタスクを介して統合で使用できます。

    • Apigee Integration で Connectors タスクを作成して使用する方法については、Connectors タスクをご覧ください。
    • Application Integration で Connectors タスクを作成して使用する方法については、Connectors タスクをご覧ください。

    Google Cloud コミュニティの助けを借りる

    Google Cloud コミュニティの Cloud フォーラムで質問を投稿したり、このコネクタについてディスカッションしたりできます。

    次のステップ