Apache Kafka to BigQuery テンプレートは、Google Cloud Managed Service for Apache Kafka クラスタからテキストデータを取り込み、結果となるレコードを BigQuery テーブルに出力するストリーミング パイプラインです。出力テーブルへのデータの挿入中に発生したエラーは、BigQuery の別のエラーテーブルに挿入されます。
Apache Kafka to BigQuery テンプレートは、セルフマネージドまたは外部の Kafka でも使用できます。
パイプラインの要件
- Apache Kafka ブローカー サーバーが動作していて Dataflow ワーカーマシンから到達可能である必要があります。
- Apache Kafka トピックが存在している必要があります。
- Dataflow API、BigQuery API、Cloud Storage API を有効にする必要があります。認証が必要な場合は、Secret Manager API も有効にする必要があります。
- Kafka 入力トピックに適したスキーマを使用して、BigQuery データセットとテーブルを作成します。同じトピックで複数のスキーマを使用していて、複数のテーブルに書き込もうとする場合は、パイプラインを構成する前にテーブルを作成する必要はありません。
- テンプレートのデッドレター(未処理のメッセージ)キューが有効になっている場合は、デッドレター キューのスキーマがない空のテーブルを作成します。
- Managed Service for Apache Kafka クラスタに接続する場合は、パイプラインが Dataflow と Managed Service for Apache Kafka を使用するに記載されている要件も満たしている必要があります。
Kafka メッセージ形式
このテンプレートでは、Kafka から次の形式でメッセージを読み取ります。
JSON 形式
JSON メッセージを読み取るには、messageFormat
テンプレート パラメータを "JSON"
に設定します。
Avro バイナリ エンコード
バイナリ Avro メッセージを読み取るには、次のテンプレート パラメータを設定します。
messageFormat
:"AVRO_BINARY_ENCODING"
binaryAvroSchemaPath
: Cloud Storage 内の Avro スキーマ ファイルの場所。例:gs://BUCKET_NAME/message-schema.avsc
Avro バイナリ形式の詳細については、Apache Avro ドキュメントの Binary Encoding をご覧ください。
Confluent Schema Registry でエンコードされた Avro
Confluent Schema Registry でエンコードされた Avro のメッセージを読み取るには、次のテンプレート パラメータを設定します。
messageFormat
:"AVRO_CONFLUENT_WIRE_FORMAT"
schemaFormat
: 次のいずれかの値。"SINGLE_SCHEMA_FILE"
: メッセージ スキーマは Avro スキーマ ファイルで定義されます。confluentAvroSchemaPath
パラメータに、スキーマ ファイルがある Cloud Storage の場所を指定します。-
"SCHEMA_REGISTRY"
: メッセージは Confluent Schema Registry を使用してエンコードされます。schemaRegistryConnectionUrl
パラメータに Confluent Schema Registry インスタンスの URL を指定し、schemaRegistryAuthenticationMode
パラメータに認証モードを指定します。
この形式の詳細については、Confluent のドキュメントの Wire format をご覧ください。
認証
Apache Kafka to BigQuery テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証をサポートしています。
テンプレートのパラメータ
必須パラメータ
- readBootstrapServerAndTopic: 入力を読み取る Kafka トピック。
- writeMode: レコードを 1 つのテーブルまたは複数のテーブルに書き込みます(スキーマに基づく)。
DYNAMIC_TABLE_NAMES
モードは、AVRO_CONFLUENT_WIRE_FORMAT
ソース メッセージ形式とSCHEMA_REGISTRY
スキーマソースでのみサポートされます。ターゲット テーブル名は、各メッセージの Avro スキーマ名に基づいて自動生成されます。単一のスキーマ(単一テーブルの作成)または複数のスキーマ(複数テーブルの作成)のいずれかになります。SINGLE_TABLE_NAME
モードでは、ユーザーが指定した単一のテーブル(単一のスキーマ)に書き込みます。デフォルトはSINGLE_TABLE_NAME
です。 - kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は
KafkaAuthenticationMethod.NONE
、SASL / PLAIN のユーザー名とパスワードの場合はKafkaAuthenticationMethod.SASL_PLAIN
、SASL_SCRAM_512 認証の場合はKafkaAuthenticationMethod.SASL_SCRAM_512
、証明書ベースの認証の場合はKafkaAuthenticationMethod.TLS
を使用します。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
は Google Cloud Apache Kafka for BigQuery クラスタでのみ使用できます。これにより、アプリケーションのデフォルト認証情報を使用して認証できます。 - messageFormat: 読み取る Kafka メッセージの形式。サポートされている値は、
AVRO_CONFLUENT_WIRE_FORMAT
(Confluent Schema Registry でエンコードされた Avro)、AVRO_BINARY_ENCODING
(プレーンなバイナリ Avro)、JSON
です。デフォルトは AVRO_CONFLUENT_WIRE_FORMAT です。 - useBigQueryDLQ: true の場合、失敗したメッセージは追加のエラー情報とともに BigQuery に書き込まれます。デフォルトは false です。
オプション パラメータ
- outputTableSpec: 出力の書き込み先となる BigQuery テーブルの場所。名前は
<project>:<dataset>.<table_name>
の形式にする必要があります。テーブルのスキーマは、入力オブジェクトと一致する必要があります。 - persistKafkaKey: true の場合、パイプラインは Kafka メッセージキーを BigQuery テーブルの
BYTES
型の_key
フィールドに保持します。デフォルトはfalse
です(キーは無視されます)。 - outputProject: データセットが存在する BigQuery 出力プロジェクト。テーブルはデータセット内に動的に作成されます。デフォルトは空です。
- outputDataset: 出力を書き込む BigQuery 出力データセット。テーブルはデータセット内に動的に作成されます。テーブルを事前に作成しておく場合は、指定された命名規則に準拠するテーブル名である必要があります。名前は
bqTableNamePrefix + Avro Schema FullName
とし、各単語はハイフン-
で区切ります。デフォルトは空です。 - bqTableNamePrefix: BigQuery 出力テーブルの作成時に使用する命名接頭辞。スキーマ レジストリを使用している場合にのみ適用されます。デフォルトは空です。
- createDisposition: BigQuery CreateDisposition。たとえば、
CREATE_IF_NEEDED
、CREATE_NEVER
などです。デフォルトは CREATE_IF_NEEDED です。 - writeDisposition: BigQuery WriteDisposition。たとえば、
WRITE_APPEND
、WRITE_EMPTY
、WRITE_TRUNCATE
などです。デフォルトは WRITE_APPEND です。 - useAutoSharding: true の場合、BigQuery への書き込み時にパイプラインで自動シャーディングが使用されます。デフォルト値は
true
です。 - numStorageWriteApiStreams: 書き込みストリームの数を指定します。このパラメータは必ず設定してください。デフォルトは
0
です。 - storageWriteApiTriggeringFrequencySec: トリガーの頻度を秒単位で指定します。このパラメータは必ず設定してください。デフォルトは 5 秒です。
- useStorageWriteApiAtLeastOnce: このパラメータは、BigQuery Storage Write API の使用が有効になっている場合にのみ有効になります。有効になっている場合は、Storage Write API に 1 回以上のセマンティクスが使用され、有効でなければ 1 回限りのセマンティクスが使用されます。デフォルトは false です。
- enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
- consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
- kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
- kafkaReadUsernameSecretId:
SASL_PLAIN
認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。デフォルトは空です。 - kafkaReadPasswordSecretId:
SASL_PLAIN
認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。デフォルトは空です。 - kafkaReadKeystoreLocation: Kafka クラスタで認証を行う際に使用する TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス。例:
gs://your-bucket/keystore.jks
- kafkaReadTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaReadTruststorePasswordSecretId: Kafka TLS 認証用に Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadKeystorePasswordSecretId: Kafka TLS 認証用に Java KeyStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadKeyPasswordSecretId: Kafka TLS 認証用の Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramUsernameSecretId:
SASL_SCRAM
認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramPasswordSecretId:
SASL_SCRAM
認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaReadSaslScramTruststorePasswordSecretId: Kafka SASL_SCRAM 認証で Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- schemaFormat: Kafka スキーマの形式。
SINGLE_SCHEMA_FILE
またはSCHEMA_REGISTRY
として指定できます。SINGLE_SCHEMA_FILE
が指定されている場合は、すべてのメッセージに avro スキーマ ファイルで言及されているスキーマを使用します。SCHEMA_REGISTRY
が指定されている場合、メッセージは 1 つのスキーマまたは複数のスキーマを持つことができます。デフォルトは SINGLE_SCHEMA_FILE です。 - confluentAvroSchemaPath: トピック内のすべてのメッセージをデコードするために使用される単一の Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
- schemaRegistryConnectionUrl: メッセージのデコード用に Avro スキーマを管理するために使用される Confluent Schema Registry のインスタンスの URL。デフォルトは空です。
- binaryAvroSchemaPath: バイナリでエンコードされた Avro メッセージをデコードするために使用される Avro スキーマ ファイルの Google Cloud Storage パス。デフォルトは空です。
- schemaRegistryAuthenticationMode: Schema Registry 認証モード。NONE、TLS、OAUTH のいずれかです。デフォルトは NONE です。
- schemaRegistryTruststoreLocation: スキーマ レジストリの認証用トラストストアが保存されている SSL 証明書の場所。例:
/your-bucket/truststore.jks
- schemaRegistryTruststorePasswordSecretId: トラストストア内のシークレットへのアクセス パスワードが保存されている Secret Manager の SecretId。例:
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
- schemaRegistryKeystoreLocation: SSL 証明書と秘密鍵を含むキーストアの場所。例:
/your-bucket/keystore.jks
- schemaRegistryKeystorePasswordSecretId: キーストア ファイルにアクセスするためのパスワードが保存されている Secret Manager の SecretId(例:
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
)。 - schemaRegistryKeyPasswordSecretId: キーストアに保存されているクライアントの秘密鍵にアクセスするために必要なパスワードの SecretId(例:
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
)。 - schemaRegistryOauthClientId: OAUTH モードでスキーマ レジストリ クライアントを認証するために使用されるクライアント ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。
- schemaRegistryOauthClientSecretId : OAUTH モードでスキーマ レジストリ クライアントの認証に使用するクライアント シークレットを含む Google Cloud Secret Manager のシークレット ID。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- schemaRegistryOauthScope: OAUTH モードでスキーマ レジストリ クライアントを認証するために使用されるアクセス トークン スコープ。このフィールドは省略可能であり、スコープ パラメータを渡さずにリクエストを実行できます。例:
openid
- schemaRegistryOauthTokenEndpointUrl: OAUTH モードでスキーマ レジストリ クライアントの認証に使用される OAuth/OIDC ID プロバイダの HTTP(S) ベースの URL。AVRO_CONFLUENT_WIRE_FORMAT メッセージ形式で必要となります。
- outputDeadletterTable: 失敗したメッセージの完全修飾 BigQuery テーブル名。さまざまな理由(スキーマの不一致、JSON の形式の誤りなど)により出力テーブルに到達できなかったメッセージは、このテーブルに書き込まれます。このテーブルはテンプレートに基づいて作成されます。例:
your-project-id:your-dataset.your-table-name
- javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。例:
gs://my-bucket/my-udfs/my_file.js
- javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。 - javascriptTextTransformReloadIntervalMinutes: UDF を再読み込みする頻度を指定します(分単位)。値が 0 より大きい場合、Dataflow は Cloud Storage 内の UDF ファイルを定期的にチェックし、ファイルが変更された場合は UDF を再読み込みします。このパラメータを使用すると、パイプラインの実行中にジョブを再起動することなく、UDF を更新できます。値が
0
の場合、UDF の再読み込みは無効になります。デフォルト値は0
です。
ユーザー定義関数
必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートにユーザー定義関数を作成するをご覧ください。
このテンプレートは、JSON 形式の Kafka メッセージでのみ UDF をサポートしています。Kafka メッセージで Avro 形式が使用されている場合、UDF は呼び出されません。関数の仕様
UDF の仕様は次のとおりです。
- 入力: JSON 文字列としてシリアル化された Kafka レコードの値
- 出力: BigQuery 宛先テーブルのスキーマに一致する JSON 文字列
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to BigQuery template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ writeMode=SINGLE_TABLE_NAME,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
BOOTSTRAP_SERVER_AND_TOPIC
: Apache Kafka ブートストラップ サーバーのアドレスとトピックブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。
- Managed Service for Apache Kafka クラスタ:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka クラスタ:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka クラスタ:
DATASET_NAME
: BigQuery データセットの名前TABLE_NAME
: BigQuery 出力テーブルの名前ERROR_TABLE_NAME
: エラーレコードを書き込む BigQuery テーブルの名前
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "writeMode": "SINGLE_TABLE_NAME", "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME", "useBigQueryDLQ": "true", "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
BOOTSTRAP_SERVER_AND_TOPIC
: Apache Kafka ブートストラップ サーバーのアドレスとトピックブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。
- Managed Service for Apache Kafka クラスタ:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka クラスタ:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka クラスタ:
DATASET_NAME
: BigQuery データセットの名前TABLE_NAME
: BigQuery 出力テーブルの名前ERROR_TABLE_NAME
: エラーレコードを書き込む BigQuery テーブルの名前
詳細については、Dataflow を使用して Kafka から BigQuery にデータを書き込むをご覧ください。
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。