このページでは、Dataflow を使用して Google Cloud Managed Service for Apache Kafka からデータを読み取り、レコードを BigQuery テーブルに書き込む方法について説明します。このチュートリアルでは、Apache Kafka to BigQuery テンプレートを使用して Dataflow ジョブを作成します。
概要
Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。Kafka は分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Dataflow を使用すると、Kafka からイベントを読み取って処理し、結果をさらに分析するために BigQuery テーブルに書き込むことができます。
Managed Service for Apache Kafka は、安全でスケーラブルな Kafka クラスタの実行に役立つ Google Cloud サービスです。

必要な権限
Dataflow ワーカー サービス アカウントには、次の Identity and Access Management(IAM)ロールが必要です。
- マネージド Kafka クライアント(
roles/managedkafka.client
) - BigQuery データ編集者(
roles/bigquery.dataEditor
)
詳細については、Dataflow のセキュリティと権限をご覧ください。
Kafka クラスタを作成する
このステップでは、Managed Service for Apache Kafka クラスタを作成します。詳細については、Managed Service for Apache Kafka クラスタを作成するをご覧ください。
コンソール
[Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
[作成] をクリックします。
[クラスタ名] フィールドに、クラスタの名前を入力します。
[リージョン] リストで、クラスタのロケーションを選択します。
[作成] をクリックします。
gcloud
managed-kafka clusters create
コマンドを実行します。
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
次のように置き換えます。
CLUSTER
: クラスタの名前REGION
: サブネットを作成したリージョンPROJECT_ID
: プロジェクト IDSUBNET_NAME
: クラスタをデプロイするサブネット
通常、クラスタの作成には 20~30 分かかります。
Kafka トピックを作成する
Managed Service for Apache Kafka クラスタを作成したら、トピックを作成します。
コンソール
[Managed Service for Apache Kafka] > [クラスタ] ページに移動します。
クラスタの名前をクリックします。
クラスタの詳細ページで、[トピックを作成] をクリックします。
[トピック名] ボックスに、トピックの名前を入力します。
[作成] をクリックします。
gcloud
managed-kafka topics create
コマンドを実行します。
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
次のように置き換えます。
TOPIC_NAME
: 作成するトピックの名前
BigQuery テーブルを作成する
この手順では、次のスキーマを持つ BigQuery テーブルを作成します。
列名 | データ型 |
---|---|
name |
STRING |
customer_id |
INTEGER |
BigQuery データセットがまだない場合は、まず作成します。詳細については、データセットの作成をご覧ください。次に、新しい空のテーブルを作成します。
コンソール
[BigQuery] ページに移動します。
[エクスプローラ] ペインでプロジェクトを開き、データセットを選択します。
[データセット情報] セクションで、[
テーブルを作成] をクリックします。[テーブルの作成元] リストで [空のテーブル] を選択します。
[テーブル] ボックスにテーブルの名前を入力します。
[スキーマ] セクションで [テキストとして編集] をクリックします。
次のスキーマ定義を貼り付けます。
name:STRING, customer_id:INTEGER
[テーブルを作成] をクリックします。
gcloud
bq mk
コマンドを使用します。
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
次のように置き換えます。
PROJECT_ID
: プロジェクト IDDATASET_NAME
: データセットの名前。TABLE_NAME
: 作成するテーブルの名前。
Dataflow ジョブを実行する
Kafka クラスタと BigQuery テーブルを作成したら、Dataflow テンプレートを実行します。
コンソール
まず、クラスタのブートストラップ サーバー アドレスを取得します。
Google Cloud コンソールで、[クラスタ] ページに移動します。
クラスタ名をクリックします。
[構成] タブをクリックします。
[ブートストラップ URL] からブートストラップ サーバーのアドレスをコピーします。
次に、テンプレートを実行して Dataflow ジョブを作成します。
[Dataflow] > [ジョブ] ページに移動します。
[テンプレートからジョブを作成] をクリックします。
[ジョブ名] フィールドに「
kafka-to-bq
」と入力します。[リージョン エンドポイント] で、Managed Service for Apache Kafka クラスタが配置されているリージョンを選択します。
Kafka to BigQuery テンプレートを選択します。
次のテンプレート パラメータを入力します。
- Kafka ブートストラップ サーバー: ブートストラップ サーバーのアドレス
- Source Kafka topic: 読み取るトピックの名前
- Kafka ソース認証モード:
APPLICATION_DEFAULT_CREDENTIALS
- Kafka message format:
JSON
- Table name strategy:
SINGLE_TABLE_NAME
- BigQuery 出力テーブル: BigQuery テーブル。
PROJECT_ID
:DATASET_NAME
.TABLE_NAME
の形式で指定します。
[Dead letter queue] で、[Write errors to BigQuery] をオンにします。
デッドレター キューの BigQuery テーブル名を
PROJECT_ID
:DATASET_NAME
.ERROR_TABLE_NAME
の形式で入力します。このテーブルを事前に作成しないでください。これはパイプラインが作成します。
[ジョブを実行] をクリックします。
gcloud
dataflow flex-template run
コマンドを実行します。
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
次の変数を置き換えます。
LOCATION
: Managed Service for Apache Kafka が配置されているリージョンPROJECT_ID
: Google Cloud プロジェクトの名前CLUSTER_ID
: クラスタの IDTOPIC
: Kafka トピックの名前DATASET_NAME
: データセットの名前。TABLE_NAME
: テーブルの名前。ERROR_TABLE_NAME
: デッドレター キューの BigQuery テーブル名
デッドレター キューのテーブルを事前に作成しないでください。これはパイプラインが作成します。
Kafka にメッセージを送信する
Dataflow ジョブの開始後に Kafka にメッセージを送信すると、パイプラインがメッセージを BigQuery に書き込みます。
Kafka クラスタと同じサブネットに VM を作成し、Kafka コマンドライン ツールをインストールします。詳細な手順については、CLI を使用してメッセージを公開して使用するのクライアント マシンを設定するをご覧ください。
次のコマンドを実行して、Kafka トピックにメッセージを書き込みます。
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
次の変数を置き換えます。
TOPIC
: Kafka トピックの名前CLUSTER_ID
: クラスタの名前LOCATION
: クラスタが配置されているリージョンPROJECT_ID
: Google Cloud プロジェクトの名前
プロンプトで次のテキスト行を入力して、Kafka にメッセージを送信します。
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
デッドレター キューを使用する
ジョブの実行中に、パイプラインが個々のメッセージを BigQuery に書き込めないことがあります。考えられるエラーは次のとおりです。
- シリアル化エラー(JSON 形式が正しくない場合など)。
- テーブル スキーマと JSON データの不一致が原因で発生する型変換エラー。
- JSON データにあり、テーブル スキーマに存在しない余分なフィールド。
これらのエラーはジョブの失敗の原因にはならず、Dataflow ジョブログにエラーとして記録されません。代わりに、パイプラインはデッドレター キューを使用して、このようなタイプのエラーを処理します。
テンプレートの実行時にデッドレター キューを有効にするには、次のテンプレート パラメータを設定します。
useBigQueryDLQ
:true
outputDeadletterTable
: BigQuery テーブルの完全修飾名(例:my-project:dataset1.errors
)
パイプラインはテーブルを自動的に作成します。Kafka メッセージの処理中にエラーが発生すると、パイプラインはテーブルにエラーエントリを書き込みます。
エラー メッセージの例:
エラーの種類 | イベントデータ | errorMessage |
---|---|---|
シリアル化エラー | "Hello world" | Failed to serialize json to table row: "Hello world" |
型変換エラー | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
不明なフィールド | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
BigQuery のデータ型を操作する
内部的には、Kafka I/O コネクタは JSON メッセージ ペイロードを Apache Beam TableRow
オブジェクトに変換し、TableRow
フィールド値を BigQuery 型に変換します。
次の表に、BigQuery のデータ型の JSON 表現を示します。
BigQuery の型 | JSON 表現 |
---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)" Well-known text(WKT)または GeoJSON を使用して、文字列としてフォーマットされた地理型を指定します。詳細については、地理空間データの読み込みをご覧ください。 |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z" JavaScript の |
構造化データ
JSON メッセージが一貫したスキーマに従っている場合は、BigQuery の STRUCT
データ型を使用して JSON オブジェクトを表すことができます。
次の例では、answers
フィールドは 2 つのサブフィールド(a
と b
)を含む JSON オブジェクトです。
{"name":"Emily","answers":{"a":"yes","b":"no"}}
次の SQL ステートメントは、互換性のあるスキーマを持つ BigQuery テーブルを作成します。
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
結果のテーブルは次のようになります。
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
半構造化データ
JSON メッセージが厳密なスキーマに従っていない場合は、JSON
データ型として BigQuery に保存することを検討してください。JSON データを JSON
型として保存することで、スキーマを事前に定義する必要がなくなります。データの取り込み後、GoogleSQL のフィールド アクセス演算子(ドット表記)と配列アクセス演算子を使用して、データにクエリを実行できます。詳細については、GoogleSQL での JSON データの操作をご覧ください。
UDF を使用してデータを変換する
このチュートリアルでは、Kafka メッセージが JSON 形式で、BigQuery テーブル スキーマが JSON データと一致しており、データに変換が適用されていないことを前提としています。
BigQuery に書き込まれる前にデータを変換する JavaScript ユーザー定義関数(UDF)を指定することもできます。また UDF は、フィルタリング、個人を特定できる情報(PII)の削除、追加のフィールドによるデータの拡充などの処理も実行できます。
詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。