カスタムソースとカスタムシンク

Dataflow SDK では、カスタムのデータソースやシンクを作成するために使用できる、拡張可能な API が提供されています。Dataflow SDK でネイティブ サポートされていないデータソースやシンクを対象にして、パイプラインでデータの読み取りや書き込みを行いたい場合は、カスタムのデータソースやシンクを作成する必要があります。

カスタムソースを作成するには、Dataflow SDK の抽象 Source サブクラス(BoundedSourceUnboundedSource など)を拡張します。カスタムシンクは、Dataflow SDK の抽象 Sink 基底クラスを拡張して作成します。拡張可能な API を使用すると、有限(バッチ)または無限(ストリーミング)のデータを読み取るカスタムソースを作成できるほか、有限データのみを書き込むシンクを作成できます。

Dataflow では、今後のリリースで、無限データを書き込むカスタムシンクを追加サポートする予定です。

カスタムソースとカスタムシンクの基本的なコード要件

Dataflow サービスでは、デベロッパーが提供したクラスを使用して、複数のワーカー インスタンスを並行的に使用しながらデータの読み取りや書き込みを行います。そのため、デベロッパーが Source および Sink サブクラス用に提供するコードは次の基本的な要件を満たす必要があります。

直列化可能性

Source または Sink サブクラスは、有限か無限かにかかわらず、Serializable である必要があります。Dataflow サービスは、Source または Sink サブクラスのインスタンスを複数作成して複数のリモート ワーカーに送信して、並行的な読み取りや書き込みを可能にすることがあります。

不変性

Source または Sink サブクラスは、事実上不変である必要があります。すべてのプライベート フィールドは final で宣言される必要があり、コレクション型のすべてのプライベート変数は事実上不変である必要があります。ユーザーのクラスにセッター メソッドが含まれる場合、それらのメソッドは、関連するフィールドを修正した状態で、オブジェクトの独立コピーを返す必要があります。

Source または Sink サブクラスで変更可能な状態を使用するのは、ソースを実装するために必要な高負荷な計算の遅延評価を使用している場合に限られます。その場合は、すべての可変インスタンス変数を transient で宣言する必要があります。

スレッドセーフである

カスタムソースを作成して Dataflow の Dynamic Work Rebalancing 機能と連携させる場合は、コードをスレッドセーフにすることが不可欠です。Dataflow SDK for Java では、これを容易にするためのヘルパークラスが提供されています。詳細については、後述の BoundedSource と Dynamic Work Rebalancing の連携を参照してください。

テスト可能である

すべての Source および Sink サブクラスを完全に単体テストすることが不可欠です。特に、Dataflow の動的ワーク リバランシングのような高度な機能と連携させるクラスを作成する場合には、これが重要です。軽微な実装エラーが原因で、検出困難なデータ破損やデータ消失(レコードのスキップや重複など)が発生する可能性もあります。

Dataflow SDK は、ソースのテストを補助するために、SourceTestUtils クラスを提供しています。SourceTestUtils には BoundedSource 実装の一部のプロパティを自動的に検証するためのユーティリティが含まれています。SourceTestUtils を使用すると、コード行数が比較的少ない各種の入力を使用して実装のテスト カバレッジを向上させることができます。

カスタムソースの作成

パイプライン用のカスタム データソースを作成するには、入力ソースからデータを読み取る方法や、データソースを複数の部分に分割する方法を Dataflow サービスに伝えるための、形式固有のロジックを提供して、複数のワーカー インスタンスがデータを並行的に読み取れるようにする必要があります。無限データを読み取るカスタム データソースを作成する場合は、ソースのウォーターマークやオプションのチェックポインティングを管理するための追加ロジックを提供する必要があります。

カスタムソース用のロジックを提供するには、次のクラスを作成します。

  • BoundedSource のサブクラス、または UnboundedSource のサブクラス。前者は有限(バッチ)データセットを読み取る場合に使用し、後者は無限(ストリーミング)データセットを読み取る場合に使用します。これらのサブクラスで、読み取りたいデータの情報を記述します。これには、データの場所やパラメータ(読み取るデータの量など)が含まれます。
  • Dataflow SDK Source.Reader クラスのサブクラス。それぞれの Source には、その Source からの読み取りに関わるすべての状態をキャプチャする、Reader が関連付けられている必要があります。これには、読み取るデータ形式の固有の要件に依存する、ファイル処理、RPC 接続、およびその他のパラメータが含まれます。

    Reader クラス階層は、Source 階層をミラーリングします。BoundedSource を拡張する場合は、関連する BoundedReader を提供する必要があります。UnboundedSource を拡張する場合は、関連する UnboundedReader を提供する必要があります。

ソース サブクラスの実装

ユーザーは、データが有限バッチか無限ストリームかによって BoundedSource または UnboundedSource のサブクラスを作成する必要があります。いずれの場合も、Source サブクラスは、スーパークラス内の抽象メソッドをオーバーライドする必要があります。カスタム データソースを使用する場合、Dataflow サービスはこれらのメソッドを使用して、データセットのサイズを見積もり、それを並行読み取り用に分割します。

また、Source サブクラスでは、場所などのデータソースに関する基本情報も管理する必要があります。たとえば、Dataflow の DatastoreIO クラスにあるサンプル実装 Source は、Datastore からデータを取得するために使用される hostdatasetIDquery を引数として受け取ります。

BoundedSource

BoundedSource は、Dataflow サービスが(場合によって並行的に)読み取りを実行できる、有限のデータセットを表します。BoundedSource は、データセットを分割して複数のリモート ワーカーから読み取れるようにするためにサービスが使用する、一連の抽象メソッドを格納します。

BoundedSource を実装するには、サブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • splitIntoBundles: Dataflow サービスは、このメソッドを使用して、有限データを特定のサイズのバンドルに分割します。
  • getEstimatedSizeBytes: Dataflow サービスは、このメソッドを使用して、データの合計サイズ(バイト数)を見積もります。
  • producesSortedKeys: 並べ替えられた順序でソースが Key-Value ペアを生成するかどうかを、Dataflow サービスに通知するメソッドです。ソースで Key-Value ペアが生成されない場合、このメソッドの実装は false を返す必要があります。
  • createReader: この BoundedSource に関連付けられる BoundedReader を作成します。

BoundedSource および必要な抽象メソッドの実装方法のモデルを、Dataflow SDK の実装例 DatastoreIO で参照できます。

UnboundedSource

UnboundedSource は、Dataflow サービスが(場合によっては並行的に)読み取り可能な無限のデータ ストリームを表します。UnboundedSource は、並行的なストリーミング読み取りをサポートするためにサービスが使用する一連の抽象メソッドを格納します。これには、障害復旧のためのチェックポインティング、データの重複を防ぐためのレコード ID、およびパイプラインの下流部分のデータの完全性を見積もるためのウォーターマーキングが含まれます。

UnboundedSource を実装するには、使用するサブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • generateInitialSplits: Dataflow サービスは、このメソッドを使用して、UnboundedSource オブジェクトのリストを生成します。これにより、サービスが並行して読み取る必要があるサブストリーム インスタンスの数を表します。
  • getCheckpointMarkCoder: Dataflow サービスは、このメソッドを使用して、ソースのチェックポイントのコーダーを取得します(存在する場合)。
  • requiresDeduping: Dataflow サービスは、このメソッドを使用して、データから重複するレコードを明示的に削除する必要があるかどうかを判断します。このメソッドが true を返す場合、Dataflow サービスは、ソースの出力から重複を削除するステップを自動的に挿入します。
  • createReader: この UnboundedSource に関連付けられる UnboundedReader を作成します。

Reader サブクラスの実装

ユーザーは、BoundedReader または UnboundedReader のサブクラスを作成して、ソース サブクラスの createReader メソッドによって返されるようにする必要があります。Dataflow サービスは、これらのメソッドを Reader 内で(有限か無限かにかかわらず)使用して、データセットの実際の読み取りを実行します。

BoundedReaderUnboundedReader には同様の基本インターフェースがあり、ユーザーがこれらを定義する必要があります。さらに、他にも無限データを処理するために実装する必要がある UnboundedReader メソッドと、BoundedReader で Dataflow の動的ワーク リバランシング機能を使うために実装できるオプション メソッドもあります。また、UnboundedReader を使用する場合には、start() メソッドと advance() メソッドのセマンティクスに小さな違いがあります。

BoundedReader と UnboundedReader の両方に共通する Reader メソッド

Dataflow では、BoundedReader または UnboundedReader を使用してデータを読み取るために、次のメソッドが使用されます。

  • start: Reader を初期化し、読み込まれる最初のレコードに進みます。このメソッドは、Dataflow がデータの読み取りを開始する際に 1 回だけ明示的に呼び出されるもので、初期化のために必要とされる高負荷なオペレーションの配置先として適しています。
  • advance: Reader を次の有効なレコードに進めます。利用可能な入力がそれ以上ない場合、このメソッドは false を返す必要があります。BoundedReader では、advance が false を返したら読み取りを停止する必要がありますが、UnboundedReader では、以降の呼び出しでストリームから追加でデータが提供された場合に、true を返すことができます。
  • getCurrent: start または advance によって最後に読み取られた現在の位置のデータレコードを返します。
  • getCurrentTimestamp: 現在のデータレコードのタイムスタンプを返します。ソースが内的タイムスタンプを持つデータを読み取る場合にのみ、getCurrentTimestamp をオーバーライドする必要があります。Dataflow サービスはこの値を使用して、結果の出力 PCollection 内にある各要素の内的タイムスタンプを設定します。

UnboundedReader に固有の Reader メソッド

基本的な Reader インターフェースに加えて、UnboundedReader には無限データソースからの読み取りを管理するための追加のメソッドがいくつかあります。

  • getCurrentRecordId: 現在のレコードの一意の識別子を返します。Dataflow サービスは、これらのレコード ID を使用して重複レコードを除外します。データの各レコードに論理 ID がある場合は、このメソッドでそれらを返すことができます。その他の場合は、少なくとも 128 ビットのハッシュを使用して、レコード コンテンツのハッシュを返すことができます(Java の Object.hashCode() を使用することはおすすめしません。一般に、32 ビットのハッシュでは競合を防止するには不十分です)。
  • 注: ソースが各レコードを一意に識別するチェックポインティング スキームを使用する場合、getCurrentRecordId は省略可能です。ただし、ソースにデータを書き込む上流システムで重複レコードが生成される場合があり、ソースがそれらを読み取る可能性がある場合には、レコード ID は有用です。

  • getWatermark: Reader から提供されたウォーターマークを返します。ウォーターマークは、Reader によって今後読み取られる要素のタイムスタンプについて、その下限を概算したものです。Dataflow サービスは、ウォーターマークをデータ完全性の見積り値として使用します。ウォーターマークは、Dataflow のウィンドウ処理機能とトリガー機能で使用されます。
  • getCheckpointMark: Dataflow サービスは、このメソッドを使用してデータストリームにチェックポイントを作成します。チェックポイントは、障害復旧に使用できる UnboundedReader の進行状況を表します。チェックポインティングの方法はデータ ストリームによって異なる場合があり、受け取ったレコードの承認を必要とするソースもあれば、位置的なチェックポインティングを使用するソースもあります。ユーザーは、最も適切なチェックポインティング スキームを考慮したうえで、このメソッドを調整する必要があります。たとえば、直近の承認済みレコードを返す必要がある場合は、このメソッドをそのように調整します。
  • 注: getCheckpointMark はオプションです。データに有用なチェックポイントがない場合は実装する必要はありません。ただし、ソースでチェックポインティングを実装しなかった場合、データソースがエラー発生時にレコードの再送信を試行するかどうかによって、重複データやデータ消失が発生する可能性があります。

BoundedSource と Dynamic Work Rebalancing の連携

ソースが有限データを提供している場合は、splitAtFraction メソッドを実装して、BoundedReader を Dataflow サービスの Dynamic Work Rebalancing 機能と連携させることができます。Dataflow サービスは、指定のリーダー上で start または advancesplitAtFraction を同時に呼び出して、Source 内の残りのデータを他のワーカーに分割して再配布できます。

splitAtFraction の実装時、相互に排他的な分割データセットをコードで生成する必要があり、これらの分割データの結合体が全体的なデータセットと一致するようにする必要があります。

Source と Reader の便利な基底クラス

Dataflow SDK には、ファイルなどの共通のデータ ストレージ形式と連携する Source クラスと Reader クラスを作成する際に役立つ、便利な抽象基底クラスが含まれています。

FileBasedSource

データソースでファイルを使用している場合、Dataflow SDK for Java の FileBasedSource および FileBasedReader 抽象基底クラスから Source クラスと Reader クラスを導出できます。FileBasedSource は、ファイルとやり取りする Dataflow ソースの共通のコードを実装する、有限ソース サブクラスです。これには次の機能が含まれます。

  • ファイル パターン展開
  • シーケンシャル レコード読み取り
  • 分割点

XmlSource

データソースで XML 形式のファイルを使用している場合、Dataflow SDK for Java の XmlSource 抽象基底クラスから Source クラスを導出できます。XmlSourceFileBasedSource を拡張し、ファイルのルートやファイル内の個々のレコードを指定する XML 要素の設定など、XML ファイルを解析するための追加のメソッドを提供します。

カスタムソースからの読み取り

パイプライン内のカスタムソースからデータを読み取るには、SDK の一般的な Read 変換を適用し、.from オペレーションを使用してカスタムソースをパラメータとして渡します。

Java

  MySource source = new MySource(false, file.getPath(), 64, null);
  p.apply("ReadFileData", Read.from(source))

カスタムシンクの作成

パイプラインのカスタム データシンクを作成するには、形式固有のロジックを指定する必要があります。これによりパイプラインの PCollection から出力シンク(ディレクトリまたはファイル システム、データベース テーブルなど)に有限データを書き込む方法を Dataflow サービスに指示します。Dataflow サービスは、複数のワーカーを使用して複数のデータバンドルを並行して書き込みます。

注: 現在、Dataflow でカスタム出力シンクへの書き込みがサポートされているのは、有限データのみです。

書き込みロジックを提供するには、次のクラスを作成します。

  • Dataflow SDK の抽象基底クラス Sink のサブクラス。Sink は、パイプラインが並行的に書き込むことができる場所またはリソースを記述します。Sink サブクラスには、リソースやファイル ロケーション、データベース テーブル名などのフィールドを含めることができます。
  • サブクラス Sink.WriteOperationSink.WriteOperation は、Sink に記述された出力場所への単一の並行書き込み操作の状態を表します。WriteOperation サブクラスは、並行書き込みの初期化とファイナライズのプロセスを定義する必要があります。
  • サブクラス Sink.WriterSink.Writer は、入力 PCollection 内にある要素のバンドルを、指定したデータシンクに書き込みます。

Sink サブクラスの実装

Sink サブクラスは、パイプラインが出力を書き込む場所またはリソースを記述します。これには、ファイル システムの場所、データベース テーブルやデータセットの名前などが含まれます。Sink サブクラスは、出力場所が書き込み可能であることを検証する必要があり、その出力場所にデータを書き込む方法を定義する WriteOperation を作成する必要があります。

Sink を実装するには、サブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • validate: このメソッドは、パイプライン データの出力場所が有効で、書き込むことができることを確認します。validate では、ファイルを開けること、出力ディレクトリが存在し、ユーザーがデータベース テーブルへのアクセス権限があることを確認する必要があります。Dataflow サービスは、パイプライン作成時に validate を呼び出します。
  • createWriteOperation: このメソッドは、出力場所への書き込み方法を定義する Sink.WriteOperation オブジェクトを作成します。

WriteOperation サブクラスの実装

WriteOperation サブクラスでは、Sink で定義されている出力場所に要素のバンドルを書き込む方法を定義します。WriteOperation は、並行書き込みのために必要な初期化ファイナライズを実行します。

WriteOperation を実装するには、サブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • initialize: このメソッドは、出力場所に書き込む前に必要な初期化を実行します。Dataflow サービスは、書き込みの開始前にこのメソッドを呼び出します。initialize を使用すると、たとえば、一時的な出力ディレクトリを作成します。
  • finalize: このメソッドは、Writer クラスによって実行された書き込みの結果を処理します。finalize の実装では、書き込みが失敗した後や正常に再試行された後のクリーンアップを実行し、失敗した書き込みによって書き込まれた一時的な出力または部分的な出力の場所を特定できなければなりません。

    書き込みが失敗した場合や再試行の場合は、finalize が複数回呼び出される可能性があるため、finalize の実装はアトミックにするのがベスト プラクティスです。それができない場合は、finalize の実装をべき等にする必要があります。
  • createWriter: このメソッドは、Sink で定義された出力場所にデータのバンドルを書き込む Sink.Writer オブジェクトを作成します。

Writer サブクラスの実装

Writer サブクラスでは、Sink で定義された出力場所に単一のレコード バンドルを書き込むためのロジックを実装します。Dataflow サービスは、同じワーカーの複数のスレッドで Writer のインスタンスが複数インスタンス化される可能性があるため、静的メンバーやメソッドへのアクセスがすべてスレッドセーフである必要があります。

Writer を実装するには、サブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • open: このメソッドは、書き込み用の一時ファイルの作成など、書き込み対象のレコード バンドルの初期化を行います。Dataflow サービスは、書き込みの開始時にこのメソッドを 1 回呼び出し、書き込まれるレコード バンドルの一意なバンドル ID をこのメソッドに渡します。
  • write: このメソッドは、単一のレコードを出力場所に書き込みます。Dataflow サービスは、バンドル内の各値に対して write を呼び出します。
  • close: このメソッドは書き込みを終了し、バンドルの書き込みに使用されたすべてのリソースをクローズします。close では書き込み結果を返す必要があります。書き込み結果は、成功した書き込みを識別するために、このメソッドを包含する WriteOperation によって使用されます。Dataflow サービスは、書き込みの終了時にこのメソッドを 1 回呼び出します。

バンドル ID の処理

サービスは Writer.open を呼び出すと、書き込まれるレコードの一意のバンドル ID を渡します。Writer はこのバンドル ID を使用して、自身の出力が並行的に作成された他の Writer インスタンスのものと干渉しないことを確認する必要があります。Dataflow サービスでは、オペレーションが失敗した場合に書き込みが複数回再試行される可能性があるため、この確認は特に重要です。

たとえば、Sink の出力がファイルベースである場合、Writer クラスはバンドル ID をファイル名の接尾辞として使用することで、Writer でレコードを書き込んだ出力先が、他の Writer によって使用されていない一意の出力ファイルであることを確認できます。その後、Writerclose メソッドは、ファイルの場所を書き込み結果の一部として返すことができます。

SinkWriteOperation、および Writer を、それらに必要な抽象メソッドと一緒に実装する方法のモデルを、Dataflow SDK の実装例 DatastoreIO で確認できます。

Sink と Writer の便利な基底クラス

Dataflow SDK には、ファイルなどの共通のデータ ストレージ形式と連携する Source クラスと Reader クラスを作成する際に役立つ、便利な抽象基底クラスが含まれています。

FileBasedSink

データソースでファイルを使用している場合、Dataflow SDK for Java の抽象基底クラス FileBasedSinkFileBasedWriteOperation、および FileBasedWriter から SinkWriteOperation、および Writer クラスを導出できます。これらのクラスは、ファイルと情報をやり取りする Dataflow ソースの共通コードを実装します。これには次の機能が含まれます。

  • ファイルのヘッダーとフッターの設定
  • シーケンシャル レコード書き込み
  • 出力 MIME タイプの設定

FileBasedSink とそのサブクラスは、ローカル ファイルへの書き込みと Google Cloud Storage 内のファイルへの書き込みの両方をサポートしています。詳細については、Dataflow SDK for Java の XmlSink と呼ばれる FileBasedSink の実装例をご覧ください。

カスタムシンクへの書き込み

パイプラインのカスタムシンクにデータを書き込むには、SDK 汎用 Write 変換を適用して、.to オペレーションを使用してカスタムシンクをパラメータとして渡します。

Java

  p.apply("WriteResults", Write.to(new MySink()));