Dataflow テンプレートにユーザー定義関数を作成する

Google 提供の Dataflow テンプレートの中には、ユーザー定義関数(UDF)をサポートするものがあります。UDF を使用すると、テンプレート コードを変更せずにテンプレートの機能を拡張できます。

概要

UDF を作成するには、テンプレートに応じて JavaScript 関数または Python 関数を作成します。UDF コードファイルを Cloud Storage に保存し、テンプレート パラメータとして場所を指定します。テンプレートは、入力要素ごとに関数を呼び出します。関数は、要素を変換するか、他のカスタム ロジックを実行して、結果をテンプレートに返します。

たとえば、UDF を使用して次のことができます。

  • ターゲット スキーマに合わせて入力データを再フォーマットする。
  • 機密データを秘匿化する
  • 出力の一部の要素を除外する。

UDF 関数への入力は、JSON 文字列としてシリアル化された単一のデータ要素です。関数は、シリアル化された JSON 文字列を出力として返します。データ形式はテンプレートによって異なります。たとえば、Pub/Sub Subscription to BigQuery テンプレートでは、入力は JSON オブジェクトとしてシリアル化された Pub/Sub メッセージ データであり、出力は BigQuery テーブル行を表すシリアル化された JSON オブジェクトです。詳細については、各テンプレートのドキュメントをご覧ください。

UDF を使用してテンプレートを実行する

UDF を使用してテンプレートを実行するには、JavaScript ファイルの Cloud Storage のロケーションと関数の名前をテンプレート パラメータとして指定します。

Google 提供のテンプレートを使用すると、次のように Google Cloud コンソールで UDF を直接作成することもできます。

  1. Google Cloud コンソールで [Dataflow] ページに移動します。

    [Dataflow] ページに移動

  2. [テンプレートからジョブを作成] をクリックします。

  3. 実行する Google 提供のテンプレートを選択します。

  4. [オプション パラメータ] を開きます。テンプレートが UDF をサポートしている場合、UDF の Cloud Storage のロケーションと関数名を指定するパラメータがあります。

  5. テンプレート パラメータの横にある [UDF を作成] をクリックします。

  6. [ユーザー定義関数(UDF)の選択または作成] パネルで次の操作を行います。

    1. ファイル名を入力します。例: my_udf.js
    2. Cloud Storage フォルダを選択します。例: gs://your-bucket/your-folder
    3. インライン コードエディタを使用して、関数を記述します。エディタにはボイラープレート コードが事前入力されているため、これを開始点として使用できます。
    4. [UDF を作成] をクリックします。

      Google Cloud コンソールで UDF ファイルが保存され、Cloud Storage のロケーションにデータが自動的に設定されます。

    5. 該当するフィールドに関数の名前を入力します。

JavaScript UDF を作成する

次のコードは、簡単な NoOps JavaScript UDF を示しています。

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

JavaScript コードは Nashorn JavaScript エンジンで実行されます。UDF は、デプロイ前に Nashorn エンジンでテストすることをおすすめします。Nashorn エンジンと JavaScript の Node.js 実装は完全には一致していません。よく見られる問題は console.log()Number.isNaN() を使用することです。Nashorn エンジンでは、どちらも定義されていません。

JDK 11 がプリインストールされている Cloud Shell を使用して、Nashorn エンジンで UDF をテストできます。次のように、Nashorn をインタラクティブ モードで起動します。

jjs --language=es6

Nashorn インタラクティブ シェルで、次の操作を行います。

  1. load を呼び出して UDF JavaScript ファイルを読み込みます。
  2. パイプラインで想定されるメッセージに応じて、入力 JSON オブジェクトを定義します。
  3. JSON.stringify 関数を使用して、入力を JSON 文字列にシリアル化します。
  4. UDF 関数を呼び出して JSON 文字列を処理します。
  5. JSON.parse を呼び出して、出力をシリアル化解除します。
  6. 結果を確認します。

例:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Python UDF を作成する

次のコードは、簡単な NoOps Python UDF を示しています。

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Python UDF は、Python と Apache Beam の標準の依存関係パッケージをサポートしています。サードパーティのパッケージは使用できません。

エラー処理

通常、UDF の実行中にエラーが発生した場合、エラーはデッドレターの位置に書き込まれます。詳細はテンプレートによって異なります。たとえば、Pub/Sub Subscription to BigQuery テンプレートは _error_records テーブルを作成してエラーを書き込みます。ランタイム UDF エラーは、構文エラーまたはキャッチされない例外が原因で発生する可能性があります。構文エラーを確認するには、ローカルで UDF をテストします。

処理の対象外の要素については、プログラムで例外をスローできます。この場合、テンプレートでサポートされていれば、要素はデッドレター位置に書き込まれます。このアプローチの例については、ルートイベントをご覧ください。

サンプル ユースケース

このセクションでは、実際のユースケースに基づいて UDF の一般的なパターンについて説明します。

イベントを拡充する

UDF を使用して、新しいフィールドでイベントを拡充し、コンテキスト情報を提供します。

例:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

イベントを変換する

UDF を使用して、宛先で期待されている内容に応じてイベント全体の形式を変換します。

次の例では、可能であれば Cloud Logging ログエントリ(LogEntry)を元のログ文字列に戻します(ログソースによっては、元のログ文字列が textPayload フィールドに入力されることがあります)。このパターンを使用すると、Cloud Logging から LogEntry 全体を送信するのではなく、元の形式で未加工のログを送信できます。

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

イベントデータを秘匿化または削除する

UDF を使用して、イベントの一部を秘匿化または削除します。

次の例では、フィールド名 sensitiveField の値を置き換えて秘匿化し、redundantField という名前のフィールドを完全に削除します。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

ルートイベント

UDF を使用して、ダウンストリーム シンク内の別々の宛先にイベントを転送します。

次の例は、Pub/Sub to Splunk テンプレートに基づいて、各イベントを正しい Splunk インデックスに転送します。ユーザー定義のローカル関数を呼び出して、イベントをインデックスにマッピングします。

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

次の例では、テンプレートがデッドレター キューをサポートしていると仮定して、認識できないイベントをデッドレター キューに転送します(たとえば、Pub/Sub to JDBC テンプレートをご覧ください)。このパターンを使用すると、宛先に書き込む前に予期しないエントリを除外できます。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

イベントをフィルタする

UDF を使用して、望ましくないイベントや認識できないイベントを出力からフィルタリングします。

次の例では、data.severity"DEBUG" と等しいイベントを削除します。

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

次のステップ