Dataflow を使用して Kafka から BigQuery にデータを書き込む

このページでは、Dataflow を使用して Google Cloud Managed Service for Apache Kafka からデータを読み取り、レコードを BigQuery テーブルに書き込む方法について説明します。このチュートリアルでは、Apache Kafka to BigQuery テンプレートを使用して Dataflow ジョブを作成します。

概要

Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。Kafka は分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Dataflow を使用すると、Kafka からイベントを読み取って処理し、結果をさらに分析するために BigQuery テーブルに書き込むことができます。

Managed Service for Apache Kafka は、安全でスケーラブルな Kafka クラスタの実行に役立つ Google Cloud サービスです。

Kafka イベントを読み取って BigQuery に書き込む
Apache Kafka を使用したイベント ドリブン アーキテクチャ

必要な権限

Dataflow ワーカー サービス アカウントには、次の Identity and Access Management(IAM)ロールが必要です。

  • マネージド Kafka クライアント(roles/managedkafka.client
  • BigQuery データ編集者(roles/bigquery.dataEditor

詳細については、Dataflow のセキュリティと権限をご覧ください。

Kafka クラスタを作成する

このステップでは、Managed Service for Apache Kafka クラスタを作成します。詳細については、Managed Service for Apache Kafka クラスタを作成するをご覧ください。

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

    [クラスタ] に移動

  2. [作成] をクリックします。

  3. [クラスタ名] フィールドに、クラスタの名前を入力します。

  4. [リージョン] リストで、クラスタのロケーションを選択します。

  5. [作成] をクリックします。

gcloud

managed-kafka clusters create コマンドを実行します。

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

次のように置き換えます。

  • CLUSTER: クラスタの名前
  • REGION: サブネットを作成したリージョン
  • PROJECT_ID: プロジェクト ID
  • SUBNET_NAME: クラスタをデプロイするサブネット

通常、クラスタの作成には 20~30 分かかります。

Kafka トピックを作成する

Managed Service for Apache Kafka クラスタを作成したら、トピックを作成します。

コンソール

  1. [Managed Service for Apache Kafka] > [クラスタ] ページに移動します。

    [クラスタ] に移動

  2. クラスタの名前をクリックします。

  3. クラスタの詳細ページで、[トピックを作成] をクリックします。

  4. [トピック名] ボックスに、トピックの名前を入力します。

  5. [作成] をクリックします。

gcloud

managed-kafka topics create コマンドを実行します。

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

次のように置き換えます。

  • TOPIC_NAME: 作成するトピックの名前

BigQuery テーブルを作成する

この手順では、次のスキーマを持つ BigQuery テーブルを作成します。

列名 データ型
name STRING
customer_id INTEGER

BigQuery データセットがまだない場合は、まず作成します。詳細については、データセットの作成をご覧ください。次に、新しい空のテーブルを作成します。

コンソール

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. [エクスプローラ] ペインでプロジェクトを開き、データセットを選択します。

  3. [データセット情報] セクションで、[ テーブルを作成] をクリックします。

  4. [テーブルの作成元] リストで [空のテーブル] を選択します。

  5. [テーブル] ボックスにテーブルの名前を入力します。

  6. [スキーマ] セクションで [テキストとして編集] をクリックします。

  7. 次のスキーマ定義を貼り付けます。

    name:STRING,
    customer_id:INTEGER
    
  8. [テーブルを作成] をクリックします。

gcloud

bq mk コマンドを使用します。

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • DATASET_NAME: データセットの名前。
  • TABLE_NAME: 作成するテーブルの名前。

Dataflow ジョブを実行する

Kafka クラスタと BigQuery テーブルを作成したら、Dataflow テンプレートを実行します。

コンソール

まず、クラスタのブートストラップ サーバー アドレスを取得します。

  1. Google Cloud コンソールで、[クラスタ] ページに移動します。

    [クラスタ] に移動

  2. クラスタ名をクリックします。

  3. [構成] タブをクリックします。

  4. [ブートストラップ URL] からブートストラップ サーバーのアドレスをコピーします。

次に、テンプレートを実行して Dataflow ジョブを作成します。

  1. [Dataflow] > [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. [テンプレートからジョブを作成] をクリックします。

  3. [ジョブ名] フィールドに「kafka-to-bq」と入力します。

  4. [リージョン エンドポイント] で、Managed Service for Apache Kafka クラスタが配置されているリージョンを選択します。

  5. Kafka to BigQuery テンプレートを選択します。

  6. 次のテンプレート パラメータを入力します。

    • Kafka ブートストラップ サーバー: ブートストラップ サーバーのアドレス
    • Source Kafka topic: 読み取るトピックの名前
    • Kafka ソース認証モード: APPLICATION_DEFAULT_CREDENTIALS
    • Kafka message format: JSON
    • Table name strategy: SINGLE_TABLE_NAME
    • BigQuery 出力テーブル: BigQuery テーブル。PROJECT_ID:DATASET_NAME.TABLE_NAME の形式で指定します。
  7. [Dead letter queue] で、[Write errors to BigQuery] をオンにします。

  8. デッドレター キューの BigQuery テーブル名を PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME の形式で入力します。

    このテーブルを事前に作成しないでください。これはパイプラインが作成します。

  9. [ジョブを実行] をクリックします。

gcloud

dataflow flex-template run コマンドを実行します。

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

次の変数を置き換えます。

  • LOCATION: Managed Service for Apache Kafka が配置されているリージョン
  • PROJECT_ID: Google Cloud プロジェクトの名前
  • CLUSTER_ID: クラスタの ID
  • TOPIC: Kafka トピックの名前
  • DATASET_NAME: データセットの名前。
  • TABLE_NAME: テーブルの名前。
  • ERROR_TABLE_NAME: デッドレター キューの BigQuery テーブル名

デッドレター キューのテーブルを事前に作成しないでください。これはパイプラインが作成します。

Kafka にメッセージを送信する

Dataflow ジョブの開始後に Kafka にメッセージを送信すると、パイプラインがメッセージを BigQuery に書き込みます。

  1. Kafka クラスタと同じサブネットに VM を作成し、Kafka コマンドライン ツールをインストールします。詳細な手順については、CLI を使用してメッセージを公開して使用するクライアント マシンを設定するをご覧ください。

  2. 次のコマンドを実行して、Kafka トピックにメッセージを書き込みます。

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    次の変数を置き換えます。

    • TOPIC: Kafka トピックの名前
    • CLUSTER_ID: クラスタの名前
    • LOCATION: クラスタが配置されているリージョン
    • PROJECT_ID: Google Cloud プロジェクトの名前
  3. プロンプトで次のテキスト行を入力して、Kafka にメッセージを送信します。

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

デッドレター キューを使用する

ジョブの実行中に、パイプラインが個々のメッセージを BigQuery に書き込めないことがあります。考えられるエラーは次のとおりです。

  • シリアル化エラー(JSON 形式が正しくない場合など)。
  • テーブル スキーマと JSON データの不一致が原因で発生する型変換エラー。
  • JSON データにあり、テーブル スキーマに存在しない余分なフィールド。

これらのエラーはジョブの失敗の原因にはならず、Dataflow ジョブログにエラーとして記録されません。代わりに、パイプラインはデッドレター キューを使用して、このようなタイプのエラーを処理します。

テンプレートの実行時にデッドレター キューを有効にするには、次のテンプレート パラメータを設定します。

  • useBigQueryDLQ: true
  • outputDeadletterTable: BigQuery テーブルの完全修飾名(例: my-project:dataset1.errors

パイプラインはテーブルを自動的に作成します。Kafka メッセージの処理中にエラーが発生すると、パイプラインはテーブルにエラーエントリを書き込みます。

エラー メッセージの例:

エラーの種類 イベントデータ errorMessage
シリアル化エラー "Hello world" Failed to serialize json to table row: "Hello world"
型変換エラー {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
不明なフィールド {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

BigQuery のデータ型を操作する

内部的には、Kafka I/O コネクタは JSON メッセージ ペイロードを Apache Beam TableRow オブジェクトに変換し、TableRow フィールド値を BigQuery 型に変換します。

次の表に、BigQuery のデータ型の JSON 表現を示します。

BigQuery の型 JSON 表現
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Well-known text(WKT)または GeoJSON を使用して、文字列としてフォーマットされた地理型を指定します。詳細については、地理空間データの読み込みをご覧ください。

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

JavaScript の Date.toJSON メソッドを使用して値をフォーマットします。

構造化データ

JSON メッセージが一貫したスキーマに従っている場合は、BigQuery の STRUCT データ型を使用して JSON オブジェクトを表すことができます。

次の例では、answers フィールドは 2 つのサブフィールド(ab)を含む JSON オブジェクトです。

{"name":"Emily","answers":{"a":"yes","b":"no"}}

次の SQL ステートメントは、互換性のあるスキーマを持つ BigQuery テーブルを作成します。

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

結果のテーブルは次のようになります。

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

半構造化データ

JSON メッセージが厳密なスキーマに従っていない場合は、JSON データ型として BigQuery に保存することを検討してください。JSON データを JSON 型として保存することで、スキーマを事前に定義する必要がなくなります。データの取り込み後、GoogleSQL のフィールド アクセス演算子(ドット表記)と配列アクセス演算子を使用して、データにクエリを実行できます。詳細については、GoogleSQL での JSON データの操作をご覧ください。

UDF を使用してデータを変換する

このチュートリアルでは、Kafka メッセージが JSON 形式で、BigQuery テーブル スキーマが JSON データと一致しており、データに変換が適用されていないことを前提としています。

BigQuery に書き込まれる前にデータを変換する JavaScript ユーザー定義関数(UDF)を指定することもできます。また UDF は、フィルタリング、個人を特定できる情報(PII)の削除、追加のフィールドによるデータの拡充などの処理も実行できます。

詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

次のステップ