Dataflow を使用して Kafka から BigQuery にデータを書き込む

このドキュメントでは、Apache Kafka から BigQuery にストリーミングする Dataflow パイプラインの作成とデプロイの概要について説明します。

Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。Kafka は分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Dataflow を使用すると、Kafka からイベントを読み取って処理し、結果をさらに分析するために BigQuery テーブルに書き込むことができます。

Kafka イベントを読み取って BigQuery に書き込む

Google は、Kafka to BigQuery パイプラインを構成する Dataflow テンプレートを提供しています。このテンプレートは、Apache Beam SDK で提供される BigQueryIO コネクタを使用します。

このテンプレートを使用するには、次の操作を行います。

  1. Kafka を Google Cloud または他の場所にデプロイします。
  2. ネットワークを構成します。
  3. Identity and Access Management(IAM)の権限を設定します。
  4. イベントデータを変換する関数を作成します。
  5. BigQuery 出力テーブルを作成します。
  6. Dataflow テンプレートをデプロイします。

Kafka をデプロイする

Google Cloud では、Kafka クラスタを Compute Engine 仮想マシン(VM)インスタンスにデプロイできます。また、サードパーティのマネージド Kafka サービスを使用することもできます。Google Cloud のデプロイ オプションの詳細については、Apache Kafka とはをご覧ください。サードパーティの Kafka ソリューションは、Google Cloud Marketplace にあります。

また、Google Cloud の外部に配置された既存の Kafka クラスタを利用できる場合があります。たとえば、オンプレミスや別のパブリック クラウドにデプロイされた既存のワークロードを利用できる可能性があります。

ネットワークを構成する

デフォルトでは、Dataflow はデフォルトの Virtual Private Cloud(VPC)ネットワーク内でインスタンスを起動します。Kafka の構成によっては、Dataflow に異なるネットワークとサブネットを構成しなければならない場合があります。詳細については、ネットワークとサブネットワークの指定をご覧ください。 ネットワークを構成するときは、Dataflow ワーカーマシンが Kafka ブローカーにアクセスできるようにファイアウォール ルールを作成します。

VPC Service Controls を使用している場合は、Kafka クラスタを VPC Service Controls の境界内に配置するか、認可済み VPN または Cloud Interconnect に境界を拡張します。

Kafka クラスタが Google Cloud の外部にデプロイされている場合は、Dataflow と Kafka クラスタの間にネットワーク接続を作成する必要があります。複数のネットワーク オプションがあり、それぞれトレードオフがあります。

予測可能なパフォーマンスと信頼性という点で Dedicated Interconnect が最良のオプションですが、サードパーティが新しい回線をプロビジョニングする必要があるためセットアップに時間がかかることがあります。パブリック IP ベースのトポロジでは、必要となるネットワーキング作業がほとんどないため、すぐに使い始めることができます。

以降のセクションでは、これらのオプションについて詳しく説明します。

共有 RFC 1918 アドレス空間

Dedicated Interconnect と IPsec VPN の両方式は、Virtual Private Cloud(VPC)の RFC 1918 IP アドレスに直接アクセスできるため、Kafka の構成を簡素化できます。VPN ベースのトポロジを使用している場合は、ハイスループット VPN のセットアップを検討してください。

デフォルトでは、Dataflow はデフォルトの VPC ネットワークでインスタンスを起動します。プライベート ネットワーク トポロジで、Cloud Router で明示的に定義されたルートによって Google Cloud 内のサブネットワークが Kafka クラスタに接続されている場合は、Dataflow インスタンスをどこに配置するかを自分で制御できることが必要になります。Dataflow を使用して、networksubnetwork実行パラメータを構成できます。

Dataflow がスケールアウトを試みてインスタンスを起動するとき、対応するサブネットワークで十分な数の IP アドレスを使用できることを確認してください。また、Dataflow インスタンスを起動するために別のネットワークを作成する場合は、プロジェクト内のすべての仮想マシン間の TCP トラフィックを有効にするファイアウォール ルールを設定するようにしてください。このファイアウォール ルールは、デフォルトのネットワークにはすでに構成されています。

パブリック IP アドレス空間

このアーキテクチャでは、Transport Layer Security(TLS)を使用して外部クライアントと Kafka との間のトラフィックを保護し、ブローカー間の通信には非暗号化トラフィックを使用します。Kafka リスナーが、内部通信と外部通信の両方に使用されるネットワーク インターフェースにバインドする場合、リスナーを構成するのは簡単です。ただし、多くのシナリオでは、外部にアドバタイズされる、クラスタ内の Kafka ブローカーのアドレスは、Kafka が使用する内部ネットワーク インターフェースのものとは異なります。このような場合は、advertised.listeners プロパティを使用できます。

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

外部クライアントはポート 9093 を使用して SSL チャネル経由で接続し、内部クライアントはポート 9092 を使用して平文チャネル経由で接続します。advertised.listeners でアドレスを指定する際は、外部トラフィックと内部トラフィックの両方で同じインスタンスに解決される DNS 名を使用してください(このサンプルの場合は kafkabroker-n.mydomain.com)。パブリック IP アドレスは内部トラフィックでは解決できない可能性があるため、パブリック IP アドレスを使用すると機能しない場合があります。

IAM 権限を設定する

Dataflow ジョブは、次の 2 つの IAM サービス アカウントを使用します。

  • Dataflow サービスは、Dataflow サービス アカウントを使用して Google Cloud リソースを操作します(VM の作成など)。
  • Dataflow ワーカー VM は、ワーカー サービス アカウントを使用してパイプラインのファイルとその他のリソースにアクセスします。このサービス アカウントには、BigQuery 出力テーブルへの書き込みアクセス権が必要です。また、パイプライン ジョブが参照する他のリソースに対するアクセス権も必要です。

これら 2 つのサービス アカウントに適切なロールがあることを確認してください。詳細については、Dataflow のセキュリティと権限をご覧ください。

BigQuery 用にデータを変換する

Kafka to BigQuery テンプレートでは、1 つ以上の Kafka トピックからイベントを読み取り、それらを BigQuery テーブルに書き込むパイプラインを作成します。BigQuery に書き込まれる前にイベントデータを変換する JavaScript ユーザー定義関数(UDF)を指定することもできます。

パイプラインからの出力は、出力テーブルのスキーマに一致する JSON 形式のデータである必要があります。Kafka イベントデータがすでに JSON 形式になっている場合は、一致するスキーマを使用して BigQuery テーブルを作成し、イベントを直接 BigQuery に渡すことができます。それ以外の場合は、イベントデータを入力として受け取り、BigQuery テーブルと一致する JSON データを返す UDF を作成します。

たとえば、イベントデータに次の 2 つのフィールドが含まれているとします。

  • name(文字列)
  • customer_id(整数)

Dataflow パイプラインからの出力は、次のようになります。

{ "name": "Alice", "customer_id": 1234 }

イベントデータが JSON 形式になっていない場合は、次のようにデータを変換する UDF を作成します。

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

UDF は、イベントのフィルタリング、個人情報(PII)の削除、追加のフィールドによるデータの拡充など、イベントデータに対する追加の処理を実行できます。

テンプレート用の UDF の作成方法について詳しくは、UDF による Dataflow テンプレートの拡張をご覧ください。JavaScript ファイルを Cloud Storage にアップロードします。

BigQuery 出力テーブルを作成する

テンプレートを実行する前に、BigQuery 出力テーブルを作成します。テーブル スキーマは、パイプラインからの JSON 出力に対応している必要があります。パイプラインは、JSON ペイロードのプロパティごとに同じ名前の BigQuery テーブル列に値を書き込みます。JSON 内で存在しないプロパティは NULL 値として解釈されます。

前の例では、BigQuery テーブルには次の列が存在します。

列名 データ型
name STRING
customer_id INTEGER

CREATE TABLE SQL ステートメントを使用して、テーブルを作成できます。

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

また、JSON 定義ファイルを使用してテーブル スキーマを指定することもできます。詳細については、BigQuery ドキュメントのスキーマの指定をご覧ください。

Dataflow ジョブを実行する

BigQuery テーブルを作成したら、Dataflow テンプレートを実行します。

コンソール

Google Cloud コンソールを使用して Dataflow ジョブを作成するには、次の操作を行います。

  1. Google Cloud コンソールで [Dataflow] ページに移動します。
  2. [テンプレートからジョブを作成] をクリックします。
  3. [ジョブ名] フィールドにジョブ名を入力します。
  4. [リージョン エンドポイント] でリージョンを選択します。
  5. Kafka to BigQuery テンプレートを選択します。
  6. [必須パラメータ] に、BigQuery 出力テーブルの名前を入力します。このテーブルはすでに存在し、有効なスキーマが設定されている必要があります。
  7. [オプション パラメータを表示] をクリックして、少なくとも次のパラメータの値を入力します。

    • 入力を読み取る Kafka トピック。
    • Kafka ブートストラップ サーバーのリスト(カンマ区切り)。
    • サービス アカウントのメールアドレス。

    必要に応じて、追加のパラメータを入力します。特に、次のパラメータを指定しなければならない場合があります。

    • ネットワーキング: デフォルト ネットワーク以外の VPC ネットワークを使用するには、ネットワークとサブネットを指定します。
    • UDF: JavaScript UDF を使用するには、スクリプトの Cloud Storage ロケーションと、呼び出す JavaScript 関数の名前を指定します。

gcloud

Google Cloud CLI を使用して Dataflow ジョブを作成するには、次のコマンドを実行します。

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

次の変数を置き換えます。

  • JOB_NAME。任意のジョブ名
  • LOCATION。ジョブを実行するリージョン。リージョンとロケーションの詳細については、Dataflow のロケーションをご覧ください。
  • KAFKA_TOPICS。読み取る Kafka トピックのカンマ区切りのリスト。
  • BOOTSTRAP_SERVERS。Kafka ブートストラップ サーバーのカンマ区切りのリスト。例: 127:9092,127.0.0.1:9093
  • OUTPUT_TABLE。BigQuery 出力テーブル。PROJECT_ID:DATASET_NAME.TABLE_NAME として指定します。例: my_project:dataset1.table1
  • IAM_SERVICE_ACCOUNT。省略できます。ジョブを実行するサービス アカウントのメールアドレス。
  • UDF_SCRIPT_PATH。省略できます。UDF を含む JavaScript ファイルへの Cloud Storage パス。例: gs://your-bucket/your-function.js
  • UDF_FUNCTION_NAME。省略できます。UDF として呼び出す JavaScript 関数の名前。
  • VPC_NETWORK_NAME。省略できます。ワーカーが割り当てられるネットワーク。
  • SUBNET_NAME。省略できます。ワーカーが割り当てられるサブネットワーク。

データ型

このセクションでは、BigQuery テーブル スキーマのさまざまなデータ型を処理する方法について説明します。

内部的には、JSON メッセージは TableRow オブジェクトに変換され、TableRow フィールド値は BigQuery 型に変換されます。

スカラー型

次の例では、文字列型、数値型、ブール型、日時型、時間隔型、地理型など、さまざまなスカラーデータ型を持つ BigQuery テーブルを作成しています。

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

互換性のあるフィールドを持つ JSON ペイロードは次のとおりです。

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

注:

  • TIMESTAMP 列では、JavaScript の Date.toJSON メソッドを使用して値をフォーマットできます。
  • GEOGRAPHY 列では、Well-known text(WKT)または GeoJSON を使用して、文字列としてフォーマットされた地理型を指定できます。詳細については、地理空間データの読み込みをご覧ください。

BigQuery のデータ型の詳細については、データ型をご覧ください。

配列

ARRAY データ型を使用すると、BigQuery に配列を保存できます。次の例では、JSON ペイロードには、値が JSON 配列である scores という名前のプロパティが含まれています。

{"name":"Emily","scores":[10,7,10,9]}

次の CREATE TABLE SQL ステートメントは、互換性のあるスキーマを持つ BigQuery テーブルを作成します。

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

結果のテーブルは次のようになります。

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

構造

BigQuery の STRUCT データ型には、名前付きフィールドの順序付きリストが含まれています。STRUCT を使用すると、一貫したスキーマに従う JSON オブジェクトを保持できます。

次の例では、JSON ペイロードには、値が JSON オブジェクトである val という名前のプロパティが含まれています。

{"name":"Emily","val":{"a":"yes","b":"no"}}

次の CREATE TABLE SQL ステートメントは、互換性のあるスキーマを持つ BigQuery テーブルを作成します。

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

結果のテーブルは次のようになります。

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

半構造化イベントデータ

Kafka イベントデータが厳密なスキーマに従っていない場合は、JSON データ型(プレビュー)として BigQuery に保存することを検討してください。JSON データを JSON データ型として保存することで、イベント スキーマを事前に定義する必要がなくなります。データの取り込み後、フィールド アクセス演算子(ドット表記)と配列アクセス演算子を使用して出力テーブルにクエリを実行できます。

まず、JSON 列を持つテーブルを作成します。

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

次に、イベント ペイロードを JSON オブジェクト内にラップする JavaScript UDF を定義します。

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

データが BigQuery に書き込まれた後、フィールド アクセス演算子を使用して個々のフィールドにクエリを実行できます。たとえば、次のクエリは各レコードの name フィールドの値を返します。

SELECT event_data.name FROM my_dataset1.kafka_events;

BigQuery で JSON を使用する方法の詳細については、Google 標準 SQL での JSON データの操作をご覧ください。

エラーとロギング

パイプラインの実行でエラーが発生したり、個々の Kafka イベントの処理中にエラーが発生することがあります。

パイプラインのエラー処理の詳細については、パイプラインのトラブルシューティングとデバッグをご覧ください。

ジョブが正常に実行されているときに、個々の Kafka イベントの処理中にエラーが発生した場合、パイプライン ジョブは BigQuery のテーブルにエラーレコードを書き込みます。ジョブ自体は失敗しません。イベントレベルのエラーは Dataflow ジョブログにエラーとして記録されません。

パイプライン ジョブは、エラーレコードを保持するためのテーブルを自動的に作成します。デフォルトでは、このテーブルの名前は output_table_error_records になります。output_table は出力テーブルの名前です。たとえば、出力テーブルの名前が kafka_events の場合、エラーテーブルの名前は kafka_events_error_records になります。別の名前を指定するには、outputDeadletterTable テンプレート パラメータを設定します。

outputDeadletterTable=my_project:dataset1.errors_table

考えられるエラーは次のとおりです。

  • シリアル化エラー(JSON 形式が正しくない場合など)。
  • テーブル スキーマと JSON データの不一致が原因で発生する型変換エラー。
  • JSON データにあり、テーブル スキーマに存在しない余分なフィールド。

エラー メッセージの例:

エラーの種類 イベントデータ errorMessage
シリアル化エラー "Hello world" Failed to serialize json to table row: "Hello world"
型変換エラー {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
不明なフィールド {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

次のステップ