コンテンツに移動
デベロッパー

UDF による Dataflow テンプレートの拡張

2021年9月1日
https://storage.googleapis.com/gweb-cloudblog-publish/original_images/dataflow-hero.gif
Google Cloud Japan Team

※この投稿は米国時間 2021 年 8 月 12 日に、Google Cloud blog に投稿されたものの抄訳です。

Google は、お客様が頻繁に行うデータタスクで一般的に使われる Dataflow テンプレートのセットを提供しています。また、デベロッパーが拡張できる参照用のデータ パイプラインとしても利用できます。しかし、Dataflow テンプレートのコード自体の変更や保守を行わず、Dataflow テンプレートをカスタマイズすることも可能です。ユーザー定義関数(UDF)を使用すると、カスタム ロジックで特定の Dataflow テンプレートを拡張し、レコードをその場で変換できます。

https://storage.googleapis.com/gweb-cloudblog-publish/images/Dataflow_UDF_Messages_Transformation_TeqVtg8.max-800x800.png

UDF は簡単な要素処理のロジックを実装する JavaScript スニペットで、Dataflow パイプラインへの入力パラメータとして指定されます。これは、再コンパイルやテンプレートのコード自体の保守の必要なしに、パイプラインの出力形式をカスタマイズしたいユーザーには特に便利です。使用例として、レコードにフィールドを追加して拡張する、一部の機密性のフィールドを消去する、必要のないレコードをフィルタにより除外するなどが挙げられます。それぞれの使用例について以下に解説します。この機能により、Apache Beam デベロッパーでなくても、開発環境がセットアップされていなくても、これらの Dataflow テンプレートの出力を変更できます。

この記事の時点で、Google から提供されている Dataflow テンプレートのうち次のものが UDF をサポートしています。

注: ここに記載している UDF のコンセプトは UDF をサポートするすべての Dataflow テンプレートに適用されます。以下のユーティリティ UDF サンプルは Pub/Sub to Splunk Dataflow テンプレートを使用する現実のユースケースからのものですが、これらは同一または他の Dataflow テンプレートに使用するための出発点として再利用できます。

テンプレートでの UDF の動作

UDF が提供されているとき、UDF JavaScript コードは Dataflow ワーカーの Java ランタイムに含まれている Nashorn JavaScript エンジンで実行されます(Google から提供されている Dataflow テンプレートなどの Java パイプラインが該当します)。コードは、各要素について Dataflow ワーカーによりローカルで別々に起動されます。要素のペイロードはシリアル化され、JSON 文字列として相互に渡されます。

Dataflow UDF 関数から呼び出されるプロセスの形式は次のとおりです。これらを再利用し、カスタムの変換ロジックを挿入できます。

読み込んでいます...

UDF は最初に、JavaScript 標準の組み込み JSON オブジェクトを使用して、文字列化された要素 inJson を変数 obj に解釈し、終了時には変更された要素 obj の文字列化されたバージョン outJson を返す必要があります。ハイライト表示されている部分には、お使いのユースケースに応じたカスタムの要素変換ロジックを追加します。次のセクションでは、実際のユースケースからのユーティリティ UDF サンプルを紹介します。

注: 変数 includePubsubMessage は、UDF が Pub/Sub to Splunk Dataflow テンプレートに適用される場合は必須です。これは、このテンプレートでサポートされる要素形式が 2 つ存在するためです。このテンプレートは完全な Pub/Sub メッセージ ペイロード、または基礎となる Pub/Sub メッセージ データ ペイロード(デフォルト動作)のみを処理するよう構成可能です。data 変数を設定するステートメントは、次の例のように UDF での以後の変換ロジックを簡素化する目的で UDF 入力ペイロードを正規化するため必要です。他のコンテキストについては、Pub/Sub to Splunk テンプレートのドキュメントにある includePubsubMessage パラメータを参照してください。

一般的な UDF パターン

次のコード スニペットは、上記の UDF 処理関数に挿入する変換ロジックの例です。これらを、一般的なパターンごとにグループ化して以下に示します。

パターン 1: イベントの拡張

https://storage.googleapis.com/gweb-cloudblog-publish/images/1-enrich-events_k4t3CtR.max-700x700.jpg

このパターンでは、新しいフィールドでコンテキスト情報を追加してイベントを拡張します。

  • 例 1.1:
    新しいフィールドをメタデータとして追加し、パイプラインの入力 Pub/Sub サブスクリプションを追跡する

読み込んでいます...


  • 例 1.2*:
    Splunk HEC メタデータ ソース フィールドを、パイプラインの入力 Pub/Sub サブスクリプションを追跡するよう設定する

読み込んでいます...

  • 例 1.3:

ユーザー定義のローカル関数に基づいて新しいフィールドを追加する、例: callerToAppIdLookup() を静的マッピングまたはルックアップ テーブルとして動作させる

読み込んでいます...

 

パターン 2: 変換イベント

https://storage.googleapis.com/gweb-cloudblog-publish/images/2-transform-events_xYwWrYg.max-800x800.jpg

このパターンでは、宛先で期待されているものに応じてイベント全体の形式を変換します。

  • 例 2.1:
    ログを Cloud Logging ログペイロード(LogEntry)から、元のログ文字列に戻します。このパターンを VM アプリケーションまたはシステムログ(例: syslog や Windows イベントログ)とともに使用し、ソースの元のログを(JSON ペイロードの代わりに)送信できます。

読み込んでいます...

 

  • 例 2.2*:
Splunk HEC イベント メタデータを設定して、ログを Cloud Logging ログペイロード(LogEntry)から、元のログ文字列に変換します。このパターンをアプリケーションまたは VM ログ(例: syslog や Windows イベントログ)とともに使用し、ダウンストリーム分析との互換性のため、元のログを(JSON ペイロードの代わりに)インデックスに登録できます。この例では、受信リソースラベルのメタデータに HEC フィールド メタデータを設定し、ログの拡張も行います。
読み込んでいます...


パターン 3: イベントの消去

https://storage.googleapis.com/gweb-cloudblog-publish/images/3-redact-events_FUxgh4E.max-500x500.jpg

このパターンでは、イベントの一部を消去または削除します。

読み込んでいます...

パターン 4: イベントのルーティング

https://storage.googleapis.com/gweb-cloudblog-publish/images/4-route-events.max-500x500.jpg

このパターンでは、イベントをプログラムにより別々の宛先にルーティングします。

  • 例 4.1*:
    ユーザー定義のローカル関数に従い、イベントを正しい Splunk インデックスにルーティングします。たとえば、splunkIndexLookup() が静的なマッピングやルックアップ テーブルとして動作するようにします。

読み込んでいます...

  • 例 4.2:

認識されない、またはサポートされていないイベントを、Pub/Sub deadletter トピック(構成されている場合)にルーティングし、無効なデータ、または BigQuery や Splunk などダウンストリーム シンクでの不要な消費を回避します。

読み込んでいます...

 

パターン 5: イベントのフィルタリング

https://storage.googleapis.com/gweb-cloudblog-publish/images/5-filter-events.max-500x500.jpg

このパターンでは、望ましくない、または認識されないイベントをフィルタリングして除外します。

  • 例 5.1:
    特定のリソースタイプやログタイプからのイベントをドロップします。たとえば、ワーカーやシステムログなどの冗長な Dataflow オペレーション ログをフィルタリングで除去できます。

読み込んでいます...

 

  • 例 5.2:
特定のログタイプ、たとえば Cloud Run アプリケーションの stdout からのイベントをドロップします。

読み込んでいます...

* この例は、Pub/Sub to Splunk Dataflow テンプレートにのみ適用されます

UDF のテスト

機能の正しさを保証する以外にも、Dataflow ワーカーにプリインストールされている JDK(8 から 14 まで)の一部として配布されている Nashorn JavaScript エンジンにおいて、UDF コードが構文的に正しい JavaScript であることを確認する必要があります。UDF は最終的に、このエンジンで実行されます。パイプラインのデプロイ前に、Nashorn エンジンで UDF をテストすることを強くおすすめします。JavaScript 構文エラーがあると例外がスローされ、すべてのメッセージで発生する可能性もあります。これにより、UDF がそのメッセージを即座に処理できなくなり、パイプラインが停止する恐れがあります。

この記事の時点で、Google が提供している Dataflow テンプレートは、対応する Nashorn エンジン v11 リリースを含む JDK 11 環境で動作します。デフォルトでは、Nashorn エンジンは ECMAScript 5.1(ES5)のみを遵守しているため、let や const などの新しい ES6 JavaScript キーワードは構文エラーを引き起こします。また、Oracle Nashorn エンジンは Node.js と JavaScript の実装が多少異なることにも注意してください。よく見られる失敗は、たとえば console.log() や Number.isNaN() を使用することです。これらはいずれも Nashorn エンジンで定義されていません。詳細については、この Oracle Nashorn 使用法の概要をご覧ください。このような点に注意すれば、ここで解説したユーティリティ UDF は大きなコードの変更なしに、ほとんどのユースケースに対応できます。

UDF を Nashorn エンジンでテストする簡単な方法は、Cloud Shell を立ち上げることです。ここには Nashorn エンジンを起動する jjs コマンドライン ツールを含む JDK 11 がプリインストールされています。

UDF が dataflow_udf_transform.js JavaScript ファイルに保存されており、新しい inputSubscription フィールドを追加する上述の UDF の例 1.1 を使用すると仮定します。

Cloud Shell で、Nashorn を次のようにインタラクティブ モードで起動します。

読み込んでいます...

Nashorn インタラクティブ シェルで、作成した UDF JavaScript ファイルを最初に読み込みます。これにより、UDF "process" 関数がグローバル スコープで読み込まれます。

読み込んでいます...

UDF をテストするには、パイプライン上で想定される即時メッセージに応じて、任意の入力 JSON オブジェクトを定義します。この例では、パイプラインで処理する Dataflow ジョブのログメッセージのスニペットを使用します。

読み込んでいます...

これで、UDF 関数を起動して入力オブジェクトを次のように処理できます。

読み込んでいます...

UDF は前のセクションで注記したように入力文字列を期待するため、入力オブジェクトは UDF に渡される前にシリアル化されることに注意してください。

UDF 出力をプリントすると、変換されたログが、予測されるように追加された inputSubscription フィールドとともに表示されます。

読み込んでいます...

最後に、インタラクティブ シェルを終了します。

読み込んでいます...

 

UDF のデプロイ

https://storage.googleapis.com/gweb-cloudblog-publish/images/dataflow-pipeline_TU8Or6y.max-1000x1000.jpg

UDF JavaScript ファイルを含む GCS ファイルを参照して Dataflow ジョブを開始するとき、UDF をデプロイします。gcloud CLI を使用して、Pub/Sub to Splunk Dataflow テンプレートを使用するジョブを開始する例を示します。

読み込んでいます...

関連する次のパラメータを構成します。

  • gcs-location: Dataflow テンプレートの GCS ロケーション パス

  • javascriptTextTransformGcsPath: UDF コードを持つ JavaScript ファイルの GCS ロケーション パス

  • javascriptTextTransformFunctionName: UDF として呼び出す JavaScript 関数の名前

Dataflow のユーザーまたはオペレーターとして、既存のテンプレート URL(Google でホストされているもの)と自分のカスタム UDF(お客様がホストしているもの)を参照するだけでよく、Beam デベロッパー環境のセットアップや、テンプレート コード自体の保守は必要ありません。

注: 使用する Dataflow ワーカー サービス アカウントには、UDF 関数を含む GCS オブジェクト(JavaScript ファイル)へのアクセス権が必要です。Dataflow ワーカー サービス アカウントの詳細については、Dataflow ユーザー ドキュメントを参照してください。

次のステップ

上述のユーティリティ UDF のいずれかを使用して Google から提供されている既成の Dataflow テンプレートをカスタマイズする、または独自の UDF 関数を作成する方法の初歩について解説しました。パイプラインのデプロイの技術的なアーティファクトとして、UDF はインフラストラクチャのコンポーネントなので、UDF のバージョン管理など Infrastructure-as-Code(IaC)のベスト プラクティスに従うことをおすすめします。他のユーティリティ UDF についてご質問やご提案がある場合は、GitHub リポジトリで直接問題を作成するか、Google の Stack Overflow フォーラムでご質問ください。

今後のブログ投稿では、UDF のテスト(単体テストとエンドツーエンドのパイプライン テスト)や、CI / CD パイプラインのセットアップ(お使いのパイプライン用)について詳しく解説します。Apache Beam コードの保守なしで、UDF を更新するたびに新しいデプロイをトリガーする方法も紹介します。

 

-ソリューション アーキテクト Roy Arsanl
投稿先