データのエンコード

パイプライン データを作成または出力する際には、PCollection 内の要素とバイト文字列間のエンコード方法とデコード方法を指定する必要があります。バイト文字列は、中間ストレージでシンクからの読み取りやシンクへの書き込みを行う際に使用されます。Dataflow SDK では、コーダーというオブジェクトを使用して、特定の PCollection 内の要素のエンコード方法とデコード方法を記述します。

コーダーの使用

通常、外部ソースからパイプライン内にデータを読み込む(またはローカルデータからパイプライン データを作成する)場合や、パイプライン データを外部シンクに出力する場合には、コーダーを指定する必要があります。詳細についてはパイプライン I/O を参照してください。

Java

Dataflow SDK for Java の型 Coder では、データのエンコードやデコードに必要なメソッドが提供されています。Dataflow SDK for Java では、Java のさまざまな標準の型(IntegerLongDoubleStringUtf8、BigQuery TableRow など)と連携する Coder サブクラスが多数提供されています。利用可能な Coder サブクラスはすべて、com.google.cloud.dataflow.sdk.coders パッケージに含まれています。

パイプライン内にデータを読み込む際、コーダーは、入力データを言語固有の型(IIntegerString など)へと解釈する方法を示します。同様に、コーダーはパイプライン内の言語固有型を出力データシンク用のバイト文字列に書き込む方法や、パイプライン内の中間データを実体化する方法も示します。

Dataflow SDK では、パイプライン内のすべての PCollection(変換からの出力として生成されたものを含む)に対してコーダーが設定されます。多くの場合、Dataflow SDK では、出力 PCollection に対する適切なコーダーが自動的に推定されます。詳細については、コーダーの推定を参照してください。

コーダーと型の対応関係は、必ずしも 1:1 ではありません。たとえば、Integer 型は有効なコーダーを複数持つことがあり、入力と出力にそれぞれ異なる Integer コーダーが使用されることもあります。また、1 つの変換に、BigEndianIntegerCoder を使用する Integer 型の入力データと、VarIntCoder を使用する Integer 型の出力データが含まれることもあります。

Java

PCollection を入力または出力する際には、Coder を明示的に設定できます。パイプラインの Read または Write 変換を適用するときに、.withCoder メソッドを呼び出して、Coder を設定します。

通常、PCollection のコーダーを自動的に推定できない場合や、パイプラインのデフォルト以外のコーダーを使用したい場合には、ユーザーが Coder を設定する必要があります。次のサンプルコードでは、テキスト ファイルから一連の数値を読み取り、結果の PCollection に対して TextualIntegerCoder 型の Coder を設定しています。

  PCollection<Integer> numbers =
     p.begin()
      .apply(TextIO.Read.named("ReadNumbers")
                        .from("gs://my_bucket/path/to/numbers-*.txt")
                        .withCoder(TextualIntegerCoder.of()));

既存の PCollection に対してコーダーを設定するには、PCollection.setCoder メソッドを使用します。なお、(.apply を呼び出すなどして)すでにファイナライズされている PCollection に対しては、setCoder を呼び出すことはできません。

既存の PCollection のコーダーを取得するには、getCoder メソッドを使用します。指定された PCollection に対してコーダーが設定されておらず、コーダーを推定できない場合、このメソッドは失敗し、IllegalStateException が返されます。

コーダーの推定とデフォルト コーダー

Dataflow SDK では、パイプライン内のすべての PCollection に対してコーダーが必要です。ただし、多くの場合、コーダーをユーザーが明示的に指定する必要はありません(パイプラインの中間で変換によって生成される中間 PCollection に対するコーダーなど)。このような場合、Dataflow SDK では、PCollection の生成に使用された変換の入力と出力から、適切なコーダーを推定できます。

Java

Pipeline オブジェクトには CoderRegistry があります。CoderRegistry は、Java の型と、パイプラインが各 PCollection に対して使用するデフォルト コーダーとのマッピングを表します。

デフォルトでは、Dataflow SDK for Java は変換の関数オブジェクト(DoFn など)の型パラメータを使用して、出力 PCollection の要素に対する Coder を自動的に推定します。たとえば、ParDo の場合、DoFn<Integer, String> 関数オブジェクトは Integer 型の入力要素を受け取り、String 型の出力要素を生成します。このような場合、Dataflow SDK for Java は出力 PCollection<String> のデフォルトの Coder を自動的に推定します(デフォルト パイプラインの CoderRegistry の場合、これは StringUtf8Coder です)。

デフォルト コーダーと CoderRegistry

Pipeline オブジェクトには CoderRegistry オブジェクトがあります。このオブジェクトは、言語の型を、それらの型に対してパイプラインが使用するデフォルトのコーダーにマッピングします。CoderRegistry を使用すると、特定の型のデフォルト コーダーを参照できます。また、特定の型の新しいデフォルト コーダーを登録することもできます。

Java

CoderRegistry には、Dataflow SDK for Java を使用して作成した Pipeline の Java の標準の型と、それらに対応する Coder とのデフォルトのマッピングが含まれています。次の表は、標準のマッピングを示したものです。

Java の型 デフォルトの Coder
Double DoubleCoder
Instant InstantCoder
Integer VarIntCoder
Iterable IterableCoder
KV KvCoder
List ListCoder
Map MapCoder
Long VarLongCoder
String StringUtf8Coder
TableRow TableRowJsonCoder
Void VoidCoder
byte[] ByteArrayCoder
TimestampedValue TimestampedValueCoder

CoderRegistry.registerStandardCoders メソッドを使用すると、特定の CoderRegistry のデフォルト マッピングを設定できます。

デフォルト コーダーの参照

Java

CoderRegistry.getDefaultCoder メソッドを使用して、Java の型のデフォルトの Coder を確認できます。Pipeline.getCoderRegistry メソッドを使用して、特定の PipelineCoderRegistry にアクセスできます。これにより、Java の型のデフォルト Coder をパイプラインごとに確認(または設定)できます(特定のパイプラインで、Integer 値が BigEndianIntegerCoder でエンコードされることを確認するなど)。

型のデフォルト コーダーの設定

Java

特定のパイプラインについて Java の型のデフォルト Coder を設定するには、パイプラインの CoderRegistry を取得して変更します。Pipeline.getCoderRegistry メソッドを使用して CoderRegistry オブジェクトを取得します。その後、CoderRegistry.registerCoder メソッドを使用して目的の Java の型に新しい Coder を登録します。

次のサンプルコードは、パイプラインの Integer の値にデフォルトの Coder(この場合は BigEndianIntegerCoder)を設定する方法を示しています。

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  CoderRegistry cr = p.getCoderRegistry();
  cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);

カスタムデータ型にデフォルト コーダーのアノテーションを追加する

Java

パイプライン プログラムでカスタムデータ型を定義している場合、@DefaultCoder アノテーションを使用して、その型で使用するコーダーを指定できます。たとえば、SerializableCoder で使用するカスタムデータ型があるとします。次のように @DefaultCoder アノテーションを使用できます。

  @DefaultCoder(AvroCoder.class)
  public class MyCustomDataType {
    ...
  }

データ型に対応付けるカスタム コーダーを作成した場合、@DefaultCoder アノテーションを使用するには、コーダークラスで静的な Coder.of(Class<T>) ファクトリ メソッドを実装する必要があります。

  public class MyCustomCoder implements Coder {
    public static Coder<T> of(Class<T> clazz) {...}
    ...
  }

  @DefaultCoder(MyCustomCoder.class)
  public class MyCustomDataType {
    ...
  }