カスタムソースとカスタムシンク

Dataflow SDK では、カスタムのデータソースやシンクを作成するために使用できる、拡張可能な API が提供されています。Dataflow SDK でネイティブ サポートされていないデータソースやシンクを対象にして、パイプラインでデータの読み取りや書き込みを行いたい場合は、カスタムのデータソースやシンクを作成する必要があります。

カスタムソースを作成するには、Dataflow SDK の抽象 Source サブクラス(BoundedSourceUnboundedSource など)を拡張します。カスタムシンクは、Dataflow SDK の抽象 Sink 基底クラスを拡張して作成します。拡張可能な API を使用すると、有限(バッチ)または無限(ストリーミング)のデータを読み取るカスタムソースを作成したり、有限データのみを書き込むシンクを作成したりできます。

Dataflow では、今後のリリースで、無限データを書き込むカスタムシンクを追加サポートする予定です。

カスタムソースとカスタムシンクの基本的なコード要件

Dataflow サービスでは、デベロッパーが提供したクラスを使用して、複数のワーカー インスタンスを並行的に使用しながらデータの読み取りや書き込みを行います。そのため、デベロッパーが Source および Sink サブクラス用に提供するコードは、以下の基本要件を満たしている必要があります。

シリアル化可能

ユーザーの Source または Sink サブクラスは、有限か無限かにかかわらず、Serializable である必要があります。Dataflow サービスでは、ユーザーの Source または Sink サブクラスのインスタンスを複数作成して複数のリモート ワーカーに送り、並行的な読み取りや書き込みを可能にすることがあります。

不変性

Source または Sink サブクラスは、事実上不変である必要があります。すべてのプライベート フィールドは final で宣言される必要があり、コレクション型のすべてのプライベート変数は事実上不変である必要があります。ユーザーのクラスにセッター メソッドが含まれる場合、それらのメソッドは、関連するフィールドを修正した状態で、オブジェクトの独立コピーを返す必要があります。

Source または Sink サブクラスで変更可能状態を使用するのは、ソースの実装を要する高負荷な計算の遅延評価を使用する場合に限られます。その場合は、変更可能なすべてのインスタンス変数を transient で宣言する必要があります。

スレッドセーフである

カスタムソースを作成して Dataflow の Dynamic Work Rebalancing 機能と連携させる場合は、コードをスレッドセーフにすることが不可欠です。Dataflow SDK for Java では、これを容易にするためのヘルパークラスが提供されています。詳細については、後述の BoundedSource と Dynamic Work Rebalancing の連携を参照してください。

テスト可能である

すべての Source および Sink サブクラスを徹底的に単体テストすることが重要です。高度な機能(Dataflow の動的ワーク リバランシングなど)と連携させるクラスを作成する場合には、これが特に重要です。軽微な実装エラーが原因で、検出困難なデータ破損やデータ消失(レコードのスキップや重複など)が発生する可能性もあります。

Dataflow SDK には、ソースのテストに役立つ SourceTestUtils クラスが提供されています。SourceTestUtils には BoundedSource 実装の一部のプロパティを自動的に検証するためのユーティリティが含まれています。SourceTestUtils を使用すると、コード行数が比較的少ない各種の入力を使用して、実装をより広範囲にテストできます。

カスタムソースの作成

パイプライン用のカスタム データソースを作成するには、入力ソースからデータを読み取る方法や、データソースを複数の部分に分割する方法を Dataflow サービスに伝えるための、形式固有のロジックを提供して、複数のワーカー インスタンスがデータを並行的に読み取れるようにする必要があります。無限データを読み取るカスタム データソースを作成する場合は、ソースのウォーターマークやオプションのチェックポインティングを管理するための追加ロジックを提供する必要があります。

カスタムソース用のロジックを提供するには、次のクラスを作成します。

  • BoundedSource のサブクラス、または UnboundedSource のサブクラス。前者は有限(バッチ)データセットを読み取る場合に使用し、後者は無限(ストリーミング)データセットを読み取る場合に使用します。これらのサブクラスで、読み取りたいデータの情報を記述します、これには、データのロケーションやパラメータ(読み取るデータの量など)が含まれます。
  • Dataflow SDK クラス Source.Reader のサブクラス。各 Source には、その Source からの読み取りに関わるすべての状態をキャプチャする、Reader を関連付ける必要があります。これには、読み取るデータ形式の固有の要件に依存する、ファイル処理、RPC 接続、およびその他のパラメータが含まれます。

    Reader のクラス階層は Source 階層をミラーリングします。BoundedSource を拡張する場合は、関連する BoundedReader を提供する必要があります。UnboundedSource を拡張する場合は、関連する UnboundedReader を提供する必要があります。

ソース サブクラスの実装

ユーザーは、データが有限バッチか無限ストリームかによって、BoundedSource または UnboundedSource のサブクラスを作成する必要があります。どちらの場合も、Source サブクラスはスーパークラス内の抽象メソッドをオーバーライドする必要があります。カスタム データソースを使用する場合、Dataflow サービスはこれらのメソッドが使用して、データセットのサイズを見積もり、それを並行読み取り用に分割します。

また、Source サブクラスでは、データソースに関する基本情報も管理する必要があります。たとえば、Dataflow の DatastoreIO クラスにあるサンプル実装 Source は、Datastore からのデータ取得に使われる hostdatasetIDquery を引数として受け取ります。

BoundedSource

BoundedSource は、Dataflow サービスが(場合によって並行的に)読み取りを実行できる、有限データセットを表します。BoundedSource には、データセットを分割して複数のリモート ワーカーから読み取れるようにするためにサービスが使用する、一連の抽象メソッドが含まれます。

BoundedSource を実装するには、ユーザーのサブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • splitIntoBundles: Dataflow サービスは、このメソッドを使用して有限データを指定のサイズのバンドルへと分割します。
  • getEstimatedSizeBytes: Dataflow サービスは、このメソッドを使用してデータの「合計サイズ」(バイト数)を見積もります。
  • producesSortedKeys: Dataflow サービスはこのメソッドを使用して、ソースがキー/値ペアをソート順に生成するかどうかを判断します。ソースで Key-Value ペアが生成されない場合、このメソッドの実装は false を返す必要があります。
  • createReader: この BoundedSource に関連付けられる BoundedReader を作成します。

BoundedSource および必要な抽象メソッドの実装方法を示すモデルを、Dataflow SDK の実装例 DatastoreIO で参照できます。

UnboundedSource

UnboundedSource は、Dataflow サービスが(場合によって並行的に)読み取りを実行できる、無限データ ストリームを表します。UnboundedSource には、並行的なストリーミング読み取りをサポートするためにサービスで使われる一連の抽象メソッドが含まれます。これには、障害回復のためのチェックポインティング、データ重複を防ぐためのレコード ID、およびパイプラインの下流部分のデータ完全性を見積もるためのウォーターマーキングが含まれます。

UnboundedSource を実装するには、使用するサブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • generateInitialSplits: Dataflow サービスは、このメソッドを使用して UnboundedSource オブジェクトのリストを生成します。これにより、サービスが並行的に読み取るサブストリーム インスタンスの数を表します。
  • getCheckpointMarkCoder: Dataflow サービスはこのメソッドを使用して、ソースのチェックポイントのコーダーを取得します(存在する場合)。
  • requiresDeduping: Dataflow サービスはこのメソッドを使用して、データから重複レコードを明示的に削除する必要があるかどうかを決定します。このメソッドが true を返す場合、Dataflow サービスは、ソース出力から重複を削除するステップを自動的に挿入します。
  • createReader: この UnboundedSource に関連付けられる UnboundedReader を作成します。

Reader サブクラスの実装

ユーザーは、BoundedReader または UnboundedReader のサブクラスを作成して、ソース サブクラスの createReader メソッドから返されるようにする必要があります。Dataflow サービスは、これらのメソッドを Reader 内で(有限か無限かにかかわらず)使用して、データセットの実際の読み取りを実行します。

BoundedReaderUnboundedReader には類似の基本インターフェースがあり、ユーザーがこれらを定義する必要があります。これに加えて、無限データの処理用に実装する必要がある UnboundedReader 固有のメソッドや、BoundedReader で Dataflow の動的ワーク リバランシング機能を使うために実装できるオプション メソッドもあります。また、UnboundedReader を使用する場合には、start() メソッドと advance() メソッドのセマンティクスに若干の違いがあります。

BoundedReader と UnboundedReader の両方に共通する Reader メソッド

Dataflow では、BoundedReaderUnboundedReader を使用してデータを読み取るために、次のメソッドが使用されます。

  • start: Reader を初期化し、最初に読み取るレコードへと進みます。このメソッドは、Dataflow がデータの読み取りを開始する際に 1 回だけ明示的に呼び出されるもので、初期化のために必要とされる高負荷なオペレーションの配置先として適しています。
  • advance: Reader を次の有効なレコードへと進めます。このメソッドは、利用可能な入力がなくなった場合に false を返す必要があります。BoundedReader では、advance が false を返したら読み取りを停止する必要がありますが、UnboundedReader では、以降の呼び出しでストリームから追加でデータが提供された場合に、true を返すことができます。
  • getCurrent: startadvance によって最後に読み取られた、現在位置のデータレコードを返します。
  • getCurrentTimestamp: 現在のデータレコードのタイムスタンプを返します。getCurrentTimestamp をオーバーライドする必要があるのは、内的タイムスタンプがあるデータをソースが読み取る場合だけです。Dataflow サービスはこの値を使用して、結果の出力 PCollection 内にある各要素の内的タイムスタンプを設定します。

UnboundedReader に固有の Reader メソッド

基本的な Reader インターフェースに加えて、UnboundedReader には無限データソースからの読み取りを管理するための追加メソッドがいくつかあります。

  • getCurrentRecordId: 現在のレコードの一意な識別子を返します。Dataflow サービスは、これらのレコード ID を使用して重複レコードを除外します。データの各レコードに論理 ID がある場合は、このメソッドでそれらを返すことができます。その他の場合は、少なくとも 128 ビットのハッシュを使用して、レコード コンテンツのハッシュを返すことができます。(32 ビットハッシュは、一般的に競合の防止用として不十分であるため、Java の Object.hashCode() を使用することは推奨されていません。)
  • 注: 各レコードを一意に識別するチェックポインティング スキームをソースで使用している場合、getCurrentRecordId を実装するかどうかはオプションです。ただし、ソースにデータを書き込む上流システムで重複レコードが生成される場合があり、ソースがそれらを読み取る可能性がある場合には、レコード ID は有用です。

  • getWatermark: Reader から提供されたウォーターマークを返します。ウォーターマークは、Reader によって今後読み取られる要素のタイムスタンプについて、その下限を概算したものです。Dataflow サービスは、ウォーターマークをデータ完全性の見積り値として使用します。ウォーターマークは、Dataflow のウィンドウ処理機能とトリガー機能で使用されます。
  • getCheckpointMark: Dataflow サービスは、このメソッドを使用してデータ ストリーム内にチェックポイントを作成します。チェックポイントは UnboundedReader の進捗状況を表すもので、障害回復のために使用できます。チェックポインティングの方法はデータ ストリームによって異なる場合があり、受け取ったレコードの承認を必要とするソースもあれば、位置的なチェックポインティングを使用するソースもあります。ユーザーは、最も適切なチェックポインティング スキームを考慮したうえで、このメソッドを調整する必要があります。たとえば、直近の承認済みレコードを返す必要がある場合は、このメソッドをそのように調整します。
  • 注: getCheckpointMark はオプションであり、データに有用なチェックポイントがない場合は、実装する必要がありません。ただし、ソースでチェックポインティングを実装しなかった場合、データソースがエラー発生時にレコードの再送信を試行するかどうかによって、重複データやデータ消失が発生する可能性があります。

BoundedSource と Dynamic Work Rebalancing の連携

ソースが有限データを提供する場合は、splitAtFraction メソッドを実装して、BoundedReader を Dataflow サービスの Dynamic Work Rebalancing 機能と連携させることができます。Dataflow サービスでは、指定の Reader の startadvancesplitAtFraction を同時に呼び出して、Source 内の残りのデータを他のワーカーに分割して配布できます。

splitAtFraction を実装する場合は、相互に排他的な分割データセットをコードで生成する「必要があり」、それらの分割データの結合体が全体的なデータセットと一致するようにする必要があります。

Source と Reader の便利な基底クラス

Dataflow SDK には、共通のデータ ストレージ形式(ファイルなど)と連携する Source クラスや Reader クラスを作成する際に役立つ、便利な抽象基底クラスが含まれています。

FileBasedSource

データソースでファイルを使用する場合、Dataflow SDK for Java 内の FileBasedSource および FileBasedReader 抽象基底クラスから Source クラスと Reader クラスを導出できます。FileBasedSource は、ファイルとやり取りする Dataflow ソースの共通コードを実装する、有限ソース サブクラスです。これには次の機能が含まれます。

  • ファイル パターン展開
  • シーケンシャル レコード読み取り
  • 分割点

XmlSource

データソースで XML 形式のファイルを使用する場合、Dataflow SDK for Java 内の XmlSource 抽象基底クラスから Source クラスを導出できます。XmlSourceFileBasedSource を拡張し、XML ファイルを解析するための追加メソッドを提供します(ファイルのルートやファイル内の個別レコードを指定する XML 要素の設定など)。

カスタムソースからの読み取り

パイプライン内のカスタムソースからデータを読み取るには、SDK の汎用 Read 変換を適用し、.from オペレーションを使用してカスタムソースをパラメータとして渡します。

Java

  MySource source = new MySource(false, file.getPath(), 64, null);
  p.apply("ReadFileData", Read.from(source))

カスタムシンクの作成

パイプライン用のカスタム データシンクを作成するには、形式固有のロジックを提供する必要があります。これにより、パイプラインの PCollection から出力シンク(ディレクトリまたファイル システム、データベース テーブルなど)に有限データを書き込む方法を Dataflow サービスに指示します。Dataflow サービスは複数のワーカーを使用して、複数のデータバンドルを並行的に書き込みます。

注: 現在、Dataflow でカスタム出力シンクへの書き込みがサポートされているのは、有限データのみです。

書き込みロジックを提供するには、次のクラスを作成します。

  • Dataflow SDK の抽象基底クラス Sink のサブクラス。 Sink は、パイプラインで並行的な書き込みが行える場所とリソースを記述しています。Sink サブクラスには、リソースやファイル ロケーション、データベース テーブル名などのフィールドを含めることができます。
  • Sink.WriteOperation のサブクラス。 Sink.WriteOperation は、Sink に記述された出力場所に対する単一の並行書き込みオペレーションの状態を表します。WriteOperation サブクラスでは、並行書き込みの初期化とファイナライズのプロセスを定義する必要があります。
  • Sink.Writer のサブクラス。 Sink.Writer は、入力 PCollection 内にある要素のバンドルを、指定されたデータシンクに書き込みます。

Sink サブクラスの実装

Sink サブクラスでは、パイプラインの出力の書き込み先となる場所やリソースを記述します。これには、ファイル システムの場所、データベース テーブル名、データセット名などを含めることができます。Sink サブクラスは、出力場所が書き込み可能であることを検証する必要があり、その出力場所にデータを書き込む方法を定義する WriteOperation を作成する必要があります。

Sink を実装するには、ユーザーのサブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • validate: このメソッドは、パイプライン データの出力場所が有効であり、かつ書き込み可能であることを確認します。validate では、ファイルを開けること、出力ディレクトリが存在すること、ユーザーがデータベース テーブルのアクセス権限を持っていることなどを確認する必要があります。Dataflow サービスは、パイプラインの作成時に validate を呼び出します。
  • createWriteOperation: このメソッドによって作成される Sink.WriteOperation オブジェクトは、出力場所への書き込み方法を定義します。

WriteOperation サブクラスの実装

WriteOperation サブクラスでは、Sink で定義された出力場所に要素のバンドルを書き込む方法を定義します。WriteOperation は、並行書き込みのために必要な初期化ファイナライズを実行します。

WriteOperation を実装するには、ユーザーのサブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • initialize: このメソッドは、出力場所への書き込み前に必要な初期化を実行します。Dataflow サービスは、書き込みの開始前にこのメソッドを呼び出します。ユーザーは initialize を使用して、(たとえば)一時的な出力ディレクトリを作成できます。
  • finalize: このメソッドは、Writer クラスによって実行された書き込みの結果を処理します。実装する finalize では、書き込みが失敗した後や正常に再試行された後のクリーンアップを実行したり、書き込みの失敗によって書き込まれた一時的な出力や部分的な出力の場所を特定できるようにする必要があります。

    書き込みが失敗した場合や再試行された場合には finalize が複数回呼び出される可能性があるため、finalize の実装はアトミックにするのがベスト プラクティスです。それができない場合は、finalize の実装をべき等にする必要があります。
  • createWriter: このメソッドによって作成される Sink.Writer オブジェクトは、Sink で定義された出力場所にデータのバンドルを書き込みます。

Writer サブクラスの実装

Writer サブクラスでは、Sink で定義された出力場所に単一のレコード バンドルを書き込むためのロジックを実装します。Dataflow サービスでは、同じワーカーの複数のスレッドで Writer のインスタンスが複数インスタンス化される可能性があるため、静的メンバーやメソッドへのアクセスがすべてスレッドセーフである必要があります。

Writer を実装するには、ユーザーのサブクラスで次の抽象メソッドをオーバーライドする必要があります。

  • open: このメソッドは、書き込まれるレコード バンドルの初期化を実行します(書き込み用の一時ファイルの作成など)。Dataflow サービスは、書き込みの開始時にこのメソッドを 1 回呼び出し、書き込まれるレコード バンドルの一意なバンドル ID をこのメソッドに渡します。
  • write: このメソッドは、単一のレコードを出力場所に書き込みます。Dataflow サービスは、バンドル内の各値に対して write を呼び出します。
  • close: このメソッドは書き込みを終了し、バンドルの書き込みに使用されたすべてのリソースをクローズします。close では書き込み結果を返す必要があります。書き込み結果は、成功した書き込みを特定するために、このメソッドを包含する WriteOperation によって使用されます。Dataflow サービスは、書き込みの終了時にこのメソッドを 1 回呼び出します。

バンドル ID の処理

サービスは Writer.open を呼び出すと、書き込まれるレコードの一意なバンドル ID を渡します。Writer はこのバンドル ID を使用して、自身の出力が、並行的に作成された他の Writer インスタンスのものと干渉しないことを確認する必要があります。Dataflow サービスでは、オペレーションが失敗した場合に書き込みが複数回再試行される可能性があるため、この確認は特に重要です。

たとえば、Sink の出力がファイルベースである場合、Writer クラスはバンドル ID をファイル名の接尾辞として使用することで、Writer でレコードを書き込んだ出力先が、他の Writer によって使用されていない固有の出力ファイルであることを確認できます。その後、Writerclose メソッドで、ファイルの場所を書き込み結果の一部として返すことができます。

SinkWriteOperationWriter と、それらに必要な抽象メソッドの実装方法のモデルを、Dataflow SDK の実装例 DatastoreIO で参照できます。

Sink と Writer の便利な基底クラス

Dataflow SDK には、共通のデータ ストレージ形式(ファイルなど)と連携する Source クラスや Reader クラスを作成する際に役立つ、便利な抽象基底クラスが含まれています。

FileBasedSink

データソースでファイルを使用する場合、Dataflow SDK for Java 内の抽象基底クラス FileBasedSinkFileBasedWriteOperationFileBasedWriter から SinkWriteOperationWriter クラスを導出できます。これらのクラスは、ファイルと情報をやり取りする Dataflow ソースの共通コードを実装します。これには次の機能が含まれます。

  • ファイルのヘッダーとフッターの設定
  • シーケンシャル レコード書き込み
  • 出力 MIME タイプの設定

FileBasedSink とそのサブクラスは、ローカル ファイルへの書き込みと Google Cloud Storage 内のファイルへの書き込みの両方をサポートしています。詳細については、Dataflow SDK for Java の XmlSink という FileBasedSink の実装例をご覧ください。

カスタムシンクへの書き込み

パイプライン内のカスタムシンクにデータを書き込むには、SDK の汎用 Write 変換を適用し、.to オペレーションを使用してカスタムシンクをパラメータとして渡します。

Java

  p.apply("WriteResults", Write.to(new MySink()));
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。