Apache Kafka to BigQuery テンプレート

Apache Kafka to BigQuery テンプレートは、Google Cloud Managed Service for Apache Kafka クラスタからテキストデータを取り込み、結果となるレコードを BigQuery テーブルに出力するストリーミング パイプラインです。出力テーブルへのデータの挿入中に発生したエラーは、BigQuery の別のエラーテーブルに挿入されます。

Apache Kafka to BigQuery テンプレートは、セルフマネージドまたは外部の Kafka でも使用できます。

パイプラインの要件

  • Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である必要があります。
  • Apache Kafka トピックが存在している必要があります。
  • Dataflow API、BigQuery API、Cloud Storage API を有効にする必要があります。認証が必要な場合は、Secret Manager API も有効にする必要があります。
  • Kafka 入力トピックに適したスキーマを使用して、BigQuery データセットとテーブルを作成します。同じトピックで複数のスキーマを使用していて、複数のテーブルに書き込もうとする場合は、パイプラインを構成する前にテーブルを作成する必要はありません。
  • テンプレートのデッドレター(未処理のメッセージ)キューが有効になっている場合は、デッドレター キューのスキーマがない空のテーブルを作成します。
  • Managed Service for Apache Kafka クラスタに接続する場合は、パイプラインが Dataflow と Managed Service for Apache Kafka を使用するに記載されている要件も満たしている必要があります。

Kafka メッセージ形式

このテンプレートでは、Kafka から次の形式でメッセージを読み取ります。

JSON 形式

JSON メッセージを読み取るには、messageFormat テンプレート パラメータを "JSON" に設定します。

Avro バイナリ エンコード

バイナリ Avro メッセージを読み取るには、次のテンプレート パラメータを設定します。

  • messageFormat: "AVRO_BINARY_ENCODING"
  • binaryAvroSchemaPath: Cloud Storage 内の Avro スキーマ ファイルの場所。例: gs://BUCKET_NAME/message-schema.avsc

Avro バイナリ形式の詳細については、Apache Avro ドキュメントの Binary Encoding をご覧ください。

Confluent Schema Registry でエンコードされた Avro

Confluent Schema Registry でエンコードされた Avro のメッセージを読み取るには、次のテンプレート パラメータを設定します。

  • messageFormat: "AVRO_CONFLUENT_WIRE_FORMAT"

  • schemaFormat: 次のいずれかの値。
    • "SINGLE_SCHEMA_FILE": メッセージ スキーマは Avro スキーマ ファイルで定義されます。confluentAvroSchemaPath パラメータに、スキーマ ファイルがある Cloud Storage の場所を指定します。
    • "SCHEMA_REGISTRY": メッセージは Confluent Schema Registry を使用してエンコードされます。schemaRegistryConnectionUrl パラメータに Confluent Schema Registry インスタンスの URL を指定し、schemaRegistryAuthenticationMode パラメータに認証モードを指定します。

この形式の詳細については、Confluent のドキュメントの Wire format をご覧ください。

認証

Apache Kafka to BigQuery テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証をサポートしています。

テンプレートのパラメータ

必須パラメータ

  • readBootstrapServerAndTopic: 入力を読み取る Kafka トピック。
  • writeMode: レコードを 1 つのテーブルまたは複数のテーブルに書き込みます(スキーマに基づく)。DYNAMIC_TABLE_NAMES モードは、AVRO_CONFLUENT_WIRE_FORMAT ソース メッセージ形式と SCHEMA_REGISTRY スキーマソースでのみサポートされます。ターゲット テーブル名は、各メッセージの Avro スキーマ名に基づいて自動生成されます。単一のスキーマ(単一テーブルの作成)または複数のスキーマ(複数テーブルの作成)のいずれかになります。SINGLE_TABLE_NAME モードでは、ユーザーが指定した単一のテーブル(単一のスキーマ)に書き込みます。デフォルトは SINGLE_TABLE_NAME です。
  • kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は KafkaAuthenticationMethod.NONE、SASL / PLAIN のユーザー名とパスワードの場合は KafkaAuthenticationMethod.SASL_PLAIN、SASL_SCRAM_512 認証の場合は KafkaAuthenticationMethod.SASL_SCRAM_512、証明書ベースの認証の場合は KafkaAuthenticationMethod.TLS を使用します。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS は Google Cloud Apache Kafka for BigQuery クラスタでのみ使用できます。これにより、アプリケーションのデフォルト認証情報を使用して認証できます。
  • messageFormat: 読み取る Kafka メッセージの形式。サポートされている値は、AVRO_CONFLUENT_WIRE_FORMAT(Confluent Schema Registry でエンコードされた Avro)、AVRO_BINARY_ENCODING(プレーンなバイナリ Avro)、JSON です。デフォルトは AVRO_CONFLUENT_WIRE_FORMAT です。
  • useBigQueryDLQ: true の場合、失敗したメッセージは追加のエラー情報とともに BigQuery に書き込まれます。デフォルトは false です。

オプション パラメータ

  • outputTableSpec: 出力の書き込み先となる BigQuery テーブルの場所。名前は <project>:<dataset>.<table_name> の形式にする必要があります。テーブルのスキーマは、入力オブジェクトと一致する必要があります。
  • persistKafkaKey: true の場合、パイプラインは Kafka メッセージキーを BigQuery テーブルの BYTES 型の _key フィールドに保持します。デフォルトは false です(キーは無視されます)。
  • outputProject: データセットが存在する BigQuery 出力プロジェクト。テーブルはデータセット内に動的に作成されます。デフォルトは空です。
  • outputDataset: 出力を書き込む BigQuery 出力データセット。テーブルはデータセット内に動的に作成されます。テーブルを事前に作成しておく場合は、指定された命名規則に準拠するテーブル名である必要があります。名前は bqTableNamePrefix + Avro Schema FullName とし、各単語はハイフン - で区切ります。デフォルトは空です。
  • bqTableNamePrefix: BigQuery 出力テーブルの作成時に使用する命名接頭辞。スキーマ レジストリを使用している場合にのみ適用されます。デフォルトは空です。
  • createDisposition: BigQuery CreateDisposition。たとえば、CREATE_IF_NEEDEDCREATE_NEVER などです。デフォルトは CREATE_IF_NEEDED です。
  • writeDisposition: BigQuery WriteDisposition。たとえば、WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE などです。デフォルトは WRITE_APPEND です。
  • useAutoSharding: true の場合、BigQuery への書き込み時にパイプラインで自動シャーディングが使用されます。デフォルト値は true です。
  • numStorageWriteApiStreams: 書き込みストリームの数を指定します。このパラメータは必ず設定してください。デフォルトは 0 です。
  • storageWriteApiTriggeringFrequencySec: トリガーの頻度を秒単位で指定します。このパラメータは必ず設定してください。デフォルトは 5 秒です。
  • useStorageWriteApiAtLeastOnce: このパラメータは、BigQuery Storage Write API の使用が有効になっている場合にのみ有効になります。有効になっている場合は、Storage Write API に 1 回以上のセマンティクスが使用され、有効でなければ 1 回限りのセマンティクスが使用されます。デフォルトは false です。
  • enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
  • consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
  • kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
  • kafkaReadUsernameSecretId: SASL_PLAIN 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaReadPasswordSecretId: SASL_PLAIN 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaReadKeystoreLocation: Kafka クラスタで認証を行う際に使用する TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス。例: gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
  • kafkaReadTruststorePasswordSecretId: Kafka TLS 認証用に Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeystorePasswordSecretId: Kafka TLS 認証用に Java KeyStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId: Kafka TLS 認証用の Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramUsernameSecretId: SASL_SCRAM 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramPasswordSecretId: SASL_SCRAM 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
  • kafkaReadSaslScramTruststorePasswordSecretId: Kafka SASL_SCRAM 認証で Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaFormat: Kafka スキーマの形式。SINGLE_SCHEMA_FILE または SCHEMA_REGISTRY として指定できます。SINGLE_SCHEMA_FILE が指定されている場合は、すべてのメッセージに avro スキーマ ファイルで言及されているスキーマを使用します。SCHEMA_REGISTRY が指定されている場合、メッセージは 1 つのスキーマまたは複数のスキーマを持つことができます。デフォルトは SINGLE_SCHEMA_FILE です。
  • confluentAvroSchemaPath: トピック内のすべてのメッセージをデコードするために使用される単一の Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
  • schemaRegistryConnectionUrl: メッセージのデコード用に Avro スキーマを管理するために使用される Confluent Schema Registry のインスタンスの URL。デフォルトは空です。
  • binaryAvroSchemaPath: バイナリでエンコードされた Avro メッセージをデコードするために使用される Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
  • schemaRegistryAuthenticationMode: Schema Registry 認証モード。NONE、TLS、OAUTH のいずれかです。デフォルトは NONE です。
  • schemaRegistryTruststoreLocation: スキーマ レジストリの認証用トラストストアが保存されている SSL 証明書の場所。例: /your-bucket/truststore.jks
  • schemaRegistryTruststorePasswordSecretId: トラストストア内のシークレットへのアクセス パスワードが保存されている Secret Manager の SecretId。例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeystoreLocation: SSL 証明書と秘密鍵を含むキーストアの場所。例: /your-bucket/keystore.jks
  • schemaRegistryKeystorePasswordSecretId: キーストア ファイルにアクセスするためのパスワードが保存されている Secret Manager の SecretId(例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。
  • schemaRegistryKeyPasswordSecretId: キーストアに保存されているクライアントの秘密鍵にアクセスするために必要なパスワードの SecretId(例: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version)。
  • schemaRegistryOauthClientId: OAUTH モードでスキーマ レジストリ クライアントを認証するために使用されるクライアント ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。
  • schemaRegistryOauthClientSecretId : OAUTH モードでスキーマ レジストリ クライアントの認証に使用するクライアント シークレットを含む Google Cloud Secret Manager のシークレット ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaRegistryOauthScope: OAUTH モードでスキーマ レジストリ クライアントを認証するために使用されるアクセス トークン スコープ。このフィールドは省略可能であり、スコープ パラメータを渡さずにリクエストを実行できます。例: openid
  • schemaRegistryOauthTokenEndpointUrl: OAUTH モードでスキーマ レジストリ クライアントの認証に使用される OAuth/OIDC ID プロバイダの HTTP(S) ベースの URL。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。
  • outputDeadletterTable: 失敗したメッセージの完全修飾 BigQuery テーブル名。さまざまな理由(スキーマの不一致、JSON の形式の誤りなど)により出力テーブルに到達できなかったメッセージは、このテーブルに書き込まれます。このテーブルはテンプレートに基づいて作成されます。例: your-project-id:your-dataset.your-table-name
  • javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例: gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。
  • javascriptTextTransformReloadIntervalMinutes: UDF を再読み込みする頻度を指定します(分単位)。値が 0 より大きい場合、Dataflow は Cloud Storage 内の UDF ファイルを定期的にチェックし、ファイルが変更された場合は UDF を再読み込みします。このパラメータを使用すると、パイプラインの実行中にジョブを再起動することなく、UDF を更新できます。値が 0 の場合、UDF の再読み込みは無効になります。デフォルト値は 0 です。

ユーザー定義関数

必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートにユーザー定義関数を作成するをご覧ください。

このテンプレートは、JSON 形式の Kafka メッセージでのみ UDF をサポートしています。Kafka メッセージで Avro 形式が使用されている場合、UDF は呼び出されません。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: JSON 文字列としてシリアル化された Kafka レコードの値
  • 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
  8. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BOOTSTRAP_SERVER_AND_TOPIC: Apache Kafka ブートストラップ サーバーのアドレスとトピック

    ブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。

    • Managed Service for Apache Kafka クラスタ: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka クラスタ: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME: BigQuery データセットの名前
  • TABLE_NAME: BigQuery 出力テーブルの名前
  • ERROR_TABLE_NAME: エラーレコードを書き込む BigQuery テーブルの名前

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "writeMode": "SINGLE_TABLE_NAME",
          "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME",
          "useBigQueryDLQ": "true",
          "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BOOTSTRAP_SERVER_AND_TOPIC: Apache Kafka ブートストラップ サーバーのアドレスとトピック

    ブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。

    • Managed Service for Apache Kafka クラスタ: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka クラスタ: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME: BigQuery データセットの名前
  • TABLE_NAME: BigQuery 出力テーブルの名前
  • ERROR_TABLE_NAME: エラーレコードを書き込む BigQuery テーブルの名前

詳細については、Dataflow を使用して Kafka から BigQuery にデータを書き込むをご覧ください。

次のステップ