Apache Kafka to BigQuery テンプレート

Apache Kafka to BigQuery テンプレートは、Google Cloud Managed Service for Apache Kafka クラスタからテキストデータを取り込み、結果となるレコードを BigQuery テーブルに出力するストリーミング パイプラインです。出力テーブルへのデータの挿入中に発生したエラーは、BigQuery の別のエラーテーブルに挿入されます。

Apache Kafka to BigQuery テンプレートは、セルフマネージドまたは外部の Kafka でも使用できます。

パイプラインの要件

  • Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である必要があります。
  • Apache Kafka トピックが存在している必要があります。
  • Dataflow API、BigQuery API、Cloud Storage API を有効にする必要があります。認証が必要な場合は、Secret Manager API も有効にする必要があります。
  • Kafka 入力トピックに適したスキーマを使用して、BigQuery データセットとテーブルを作成します。同じトピックで複数のスキーマを使用していて、複数のテーブルに書き込もうとする場合は、パイプラインを構成する前にテーブルを作成する必要はありません。
  • テンプレートのデッドレター(未処理のメッセージ)キューが有効になっている場合は、デッドレター キューのスキーマがない空のテーブルを作成します。

Kafka メッセージ形式

Apache Kafka to BigQuery テンプレートでは、Kafka から CONFLUENT_AVRO_WIRE_FORMATAVRO_BINARY_FORMATJSON 形式でメッセージを読み取ります。

認証

Apache Kafka to BigQuery テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証をサポートしています。

テンプレートのパラメータ

必須パラメータ

  • readBootstrapServerAndTopic: 入力を読み取る Kafka トピック
  • writeMode : 書き込みモード。レコードを 1 つのテーブルまたは複数のテーブルに書き込みます(スキーマに基づく)。DYNAMIC_TABLE_NAMES モードは、AVRO_CONFLUENT_WIRE_FORMAT ソース メッセージ形式と SCHEMA_REGISTRY スキーマソースでのみサポートされます。ターゲット テーブル名は、各メッセージの Avro スキーマ名に基づいて自動生成されます。単一のスキーマ(単一テーブルの作成)または複数のスキーマ(複数テーブルの作成)のいずれかになります。SINGLE_TABLE_NAME モードでは、ユーザーが指定した単一のテーブル(単一のスキーマ)に書き込みます。デフォルトは SINGLE_TABLE_NAME です。
  • kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は NONE、SASL / PLAIN のユーザー名とパスワードの場合は SASL_PLAIN、証明書ベースの認証の場合は TLS を使用します。APPLICATION_DEFAULT_CREDENTIALS は Google Cloud Apache Kafka for BigQuery クラスタでのみ使用できます。これにより、アプリケーションのデフォルト認証情報を使用して Google Cloud Apache Kafka for BigQuery クラスタで認証できます。
  • messageFormat: 読み取る Kafka メッセージの形式。サポートされる値は、AVRO_CONFLUENT_WIRE_FORMAT(Confluent Schema Registry でエンコードされた Avro)、AVRO_BINARY_ENCODING(プレーンなバイナリ Avro)、JSON です。デフォルトは AVRO_CONFLUENT_WIRE_FORMAT です。
  • useBigQueryDLQ : true の場合、失敗したメッセージは追加のエラー情報とともに BigQuery に書き込まれます。デフォルトは false です。

オプション パラメータ

  • outputTableSpec: 出力の書き込み先となる BigQuery テーブルの場所。名前は <project>:<dataset>.<table_name> の形式にする必要があります。テーブルのスキーマは、入力オブジェクトと一致する必要があります。
  • persistKafkaKey: true の場合、パイプラインは Kafka メッセージキーを BigQuery テーブルの BYTES 型の _key フィールドに保持します。デフォルトは false です(キーは無視されます)。
  • outputProject: データセットが存在する BigQuery 出力プロジェクト。テーブルはデータセット内に動的に作成されます。デフォルトは空です。
  • outputDataset: 出力を書き込む BigQuery 出力データセット。テーブルはデータセット内に動的に作成されます。テーブルを事前に作成しておく場合は、指定された命名規則に準拠するテーブル名である必要があります。名前は bqTableNamePrefix + Avro Schema FullName とし、各単語はハイフン「-」で区切ります。デフォルトは空です。
  • bqTableNamePrefix: BigQuery 出力テーブルの作成時に使用する命名接頭辞。スキーマ レジストリを使用している場合にのみ適用されます。デフォルトは空です。
  • createDisposition: BigQuery CreateDisposition。たとえば、CREATE_IF_NEEDED、CREATE_NEVER などです。デフォルトは CREATE_IF_NEEDED です。
  • writeDisposition: BigQuery WriteDisposition。たとえば、WRITE_APPEND、WRITE_EMPTY、WRITE_TRUNCATE などです。デフォルト: WRITE_APPEND。
  • useAutoSharding: true の場合、BigQuery への書き込み時にパイプラインで自動シャーディングが使用されます。デフォルト値は true です。
  • numStorageWriteApiStreams: 書き込みストリームの数を指定します。このパラメータは必ず設定してください。デフォルトは 0 です。
  • storageWriteApiTriggeringFrequencySec: トリガーの頻度を秒単位で指定します。このパラメータは必ず設定してください。デフォルトは 5 秒です。
  • useStorageWriteApiAtLeastOnce: このパラメータは、BigQuery Storage Write API の使用が有効になっている場合にのみ有効になります。有効になっている場合は、Storage Write API に 1 回以上のセマンティクスが使用され、有効でなければ 1 回限りのセマンティクスが使用されます。デフォルトは false です。
  • enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
  • consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
  • kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
  • kafkaReadUsernameSecretId: SASL_PLAIN 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレットの ID(例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。デフォルトは空です。
  • kafkaReadPasswordSecretId: SASL_PLAIN 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレットの ID例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaReadKeystoreLocation: Kafka クラスタで認証を行う際に使用する TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス例: gs://your-bucket/keystore.jks。
  • kafkaReadTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
  • kafkaReadTruststorePasswordSecretId: Kafka TLS 認証用に Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
  • kafkaReadKeystorePasswordSecretId: Kafka TLS 認証用に Java KeyStore(JKS)ファイルにアクセスするためのパスワードが含まれる Google Cloud Secret Manager シークレット ID。(例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。
  • kafkaReadKeyPasswordSecretId: Kafka TLS 認証用の Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。(例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。
  • schemaFormat: Kafka スキーマの形式。SINGLE_SCHEMA_FILE または SCHEMA_REGISTRY として提供できます。SINGLE_SCHEMA_FILE が指定されている場合、すべてのメッセージは avro スキーマ ファイルで言及されているスキーマを持つ必要があります。SCHEMA_REGISTRY が指定されている場合、メッセージは 1 つのスキーマまたは複数のスキーマを持つことができます。デフォルトは SINGLE_SCHEMA_FILE です。
  • confluentAvroSchemaPath: トピック内のすべてのメッセージをデコードするために使用される単一の Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
  • schemaRegistryConnectionUrl: メッセージのデコード用に Avro スキーマを管理するために使用される Confluent Schema Registry のインスタンスの URL。デフォルトは空です。
  • binaryAvroSchemaPath: バイナリでエンコードされた Avro メッセージをデコードするために使用される Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
  • schemaRegistryAuthenticationMode: スキーマ レジストリの認証モード。NONE、TLS、OAUTH のいずれかであり、デフォルトは NONE です。
  • schemaRegistryTruststoreLocation: スキーマ レジストリの認証用トラストストアが保存されている SSL 証明書の場所(例: /your-bucket/truststore.jks)。
  • schemaRegistryTruststorePasswordSecretId: トラストストア内のシークレットへのアクセス パスワードが保存されている Secret Manager の SecretId(例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。
  • schemaRegistryKeystoreLocation: SSL 証明書と秘密鍵を含むキーストアの場所(例: /your-bucket/keystore.jks)。
  • schemaRegistryKeystorePasswordSecretId: キーストア ファイルにアクセスするためのパスワードが保存されている Secret Manager の SecretId(例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。
  • schemaRegistryKeyPasswordSecretId: キーストアに保存されているクライアントの秘密鍵にアクセスするために必要なパスワードの SecretId(例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。
  • schemaRegistryOauthClientId: OAUTH モードでスキーマ レジストリ クライアントを認証するために使用されるクライアント ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。
  • schemaRegistryOauthClientSecretId : OAUTH モードでスキーマ レジストリ クライアントの認証に使用するクライアント シークレットを含む Google Cloud Secret Manager のシークレット ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります(例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。
  • schemaRegistryOauthScope: OAUTH モードでスキーマ レジストリ クライアントを認証するために使用されるアクセス トークン スコープ。このフィールドは省略可能であり、スコープ パラメータを渡さずにリクエストを実行できます(例: openid)。
  • schemaRegistryOauthTokenEndpointUrl: OAUTH モードでスキーマ レジストリ クライアントの認証に使用される OAuth/OIDC ID プロバイダの HTTP(S) ベースの URL。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。
  • outputDeadletterTable: 失敗したメッセージの完全修飾 BigQuery テーブル名。さまざまな理由(スキーマの不一致、JSON の形式の誤りなど)により出力テーブルに到達できなかったメッセージは、このテーブルに書き込まれます。このテーブルはテンプレートに基づいて作成されます(例: your-project-id:your-dataset.your-table-name)。
  • javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js)。
  • javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。
  • javascriptTextTransformReloadIntervalMinutes: UDF を再読み込みする頻度を指定します(分単位)。値が 0 より大きい場合、Dataflow は Cloud Storage 内の UDF ファイルを定期的にチェックし、ファイルが変更された場合は UDF を再読み込みします。このパラメータを使用すると、パイプラインの実行中にジョブを再起動することなく、UDF を更新できます。値が 0 の場合、UDF の再読み込みは無効になります。デフォルト値は 0 です。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
  8. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BIGQUERY_TABLE: BigQuery テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BIGQUERY_TABLE: BigQuery テーブル名
  • KAFKA_TOPICS: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。
  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • KAFKA_SERVER_ADDRESSES: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例: 35.70.252.199:9092。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping をご覧ください。

詳細については、Dataflow で Kafka から BigQuery にデータを書き込むをご覧ください。

次のステップ