Avro I/O

Avro ファイル用の組み込み Read 変換および Write 変換は、AvroIO に含まれています。AvroIO は、ローカル ファイル(Java プログラムが実行されるシステム上のファイル)と、Google Cloud Storage 内のリモート ファイルの両方の読み取りと書き込みに使用できます。

Java

注: パイプラインでローカル ファイルの読み取りや書き込みを行う場合は、DirectPipelineRunner を使用して、パイプラインをローカルで実行する必要があります。Dataflow サービスがパイプラインの実行に使用する Google Compute Engine インスタンスは、読み取りや書き込みのためにローカルマシン上のファイルにアクセスできないためです。

Avro スキーマの指定

AvroIO を使用するには、読み取りや書き込みの対象レコードを記述する Avro スキーマを指定する必要があります。Avro では、データのシリアル化方法を記述するためにスキーマが使用されます。Avro スキーマの動作のしくみについては、Avro のドキュメントを参照してください。

特定の種類の Avro レコードを読み取るには Avro で生成されたクラスタイプを提供します。一方、GenericRecords を読み取るには org.apache.avro.Schema オブジェクトを提供します。通常は、スキーマ ファイル(.avsc)から Schema オブジェクトを読み取ります。また、JSON でエンコードされた文字列形式で Schema を指定することもできます。

スキーマを提供するには、AvroIO 変換で .withSchema メソッドを使用します。AvroIO.Read または AvroIO.Write を使用する場合は、必ず .withSchema を呼び出す必要があります。

AvroIO を使用した読み取り

AvroIO.Read 変換は、1 つ以上の Avro ファイルからレコードを読み取り、各要素が 1 つのレコードを表す PCollection を作成します。AvroIO.Read では、自動生成された Avro クラス オブジェクトか、GenericRecord オブジェクトの PCollection を生成できます。生成される PCollection の種類は、選択したスキーマタイプによって決まります。

自動生成された Avro クラスを使用すると、PCollection の要素はその Avro クラスタイプのオブジェクトになります。以下に例を示します。

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<AvroAutoGenClass> records = p.apply(
    AvroIO.Read.named("ReadFromAvro")
          .from("gs://some/inputData.avro")
          .withSchema(AvroAutoGenClass.class));

Avro ファイルを PCollection<GenericRecord> に読み込むには、org.apache.avro.Schema オブジェクトを渡すか、JSON エンコード文字列として記述されたスキーマを渡します。次のサンプルコードは、org.apache.avro.Schema オブジェクトを取得するため、.avsc ファイルを解析し、結果の Schema を使用して Google Cloud Storage からの分割済み入力 Avro ファイルを読み取ります。

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

  PCollection<GenericRecord> records =
    p.apply(AvroIO.Read.named("ReadFromAvro")
                       .from("gs://my_bucket/path/records-*.avro")
                       .withSchema(schema));

他のファイルベース Dataflow ソースと同様、AvroIO.Read 変換は複数の入力ファイルを読み取ることができます。ファイルベースのソースを読み取る際に複数のファイルを処理する方法について詳しくは、入力データを読み取るをご覧ください。

AvroIO を使用した書き込み

AvroIO.Write 変換は、Avro レコードの PCollection を 1 つ以上の Avro ファイルに書き込みます。AvroIO.Write を使用するには、最終的な出力データを、自動生成された Avro クラス オブジェクトの PCollection か、GenericRecordsPCollection として表す必要があります。ParDo を使用すると、データを適切に変換できます。

特定のレコードを書き込むには、自動生成された Avro クラスを Avro スキーマとして使用します。

Java

  PCollection<AvroAutoGenClass> filteredRecords = ...;
  filteredRecords.apply(AvroIO.Write.named("WriteToAvro")
                                    .to("gs://some/outputData.avro")
                                    .withSchema(AvroAutoGenClass.class)
                                    .withSuffix(".avro"));

GenericRecord オブジェクトを書き込むには、org.apache.avro.Schema を渡すか(多くの場合、.avsc ファイルの解析で取得)、JSON エンコード文字列として記述されたスキーマを渡します。次のコードサンプルは、.avsc ファイルを解析して Schema オブジェクトを取得し、それを使用して共有の出力 Avro ファイルを Google Cloud Storage に書き込みます。

Java

  Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

  PCollection<GenericRecord> records = ...;
  records.apply(AvroIO.Write.named("WriteToAvro")
                            .to("gs://my_bucket/path/numbers")
                            .withSchema(schema)
                            .withSuffix(".avro"));

デフォルトでは、AvroIO.Write は複数の出力ファイルに書き込みを行います。詳細は、出力データを書き込むをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。