E/S de Avro

Las transformaciones integradas Read y Write para los archivos Avro se incluyen en AvroIO. Puedes usar AvroIO para operaciones de lectura y escritura de archivos locales (es decir, archivos en el sistema en el que se ejecuta el programa Java) y archivos remotos en Google Cloud Storage.

Java

Nota: Si deseas que la canalización lea o escriba archivos locales, deberás usar DirectPipelineRunner para ejecutar la canalización de manera local. Esto se debe a que las instancias de Google Compute Engine que usan el servicio de Dataflow a fin de ejecutar la canalización no podrán acceder a los archivos en la máquina local para realizar operaciones de lectura y escritura.

Especifica un esquema Avro

Para usar AvroIO, deberás especificar un esquema Avro que describa los registros que se leerán o escribirán. Avro se basa en esquemas para describir cómo se serializan los datos. Consulta la documentación de Avro para conocer cómo funcionan los esquemas Avro.

Puedes leer tipos de registros de Avro específicos si proporcionas un tipo de clase generado por Avro o puedes leer GenericRecords si proporcionas un objeto org.apache.avro.Schema. Por lo general, leerás el objeto Schema desde un archivo de esquema (.avsc). También puedes especificar un Schema en formato de string codificada en JSON.

Para proporcionar un esquema, usa el método .withSchema con la transformación AvroIO. Debes llamar a .withSchema cada vez que uses AvroIO.Read o AvroIO.Write.

Lee con AvroIO

La transformación AvroIO.Read lee registros de uno o más archivos Avro y crea un PCollection en el que cada elemento representa un registro. AvroIO.Read puede producir una PCollection de objetos de clase Avro generados de manera automática o de objetos GenericRecord. El tipo de PCollection producida depende del tipo de esquema que elijas.

Usar una clase de Avro generada de forma automática dará como resultado una PCollection cuyos elementos son objetos de ese tipo de clase Avro, como se muestra a continuación:

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollection<AvroAutoGenClass> records = p.apply(
    AvroIO.Read.named("ReadFromAvro")
          .from("gs://some/inputData.avro")
          .withSchema(AvroAutoGenClass.class));

Para leer tus archivos Avro en un PCollection<GenericRecord>, puedes pasar un objeto org.apache.avro.Schema o un esquema escrito como una string codificada en JSON. En el siguiente ejemplo de código, se obtiene un objeto org.apache.avro.Schema mediante el análisis de un archivo .avsc y, luego, se usa el Schema resultante a fin de leer archivos Avro de entrada fragmentados desde Google Cloud Storage:

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

  PCollection<GenericRecord> records =
    p.apply(AvroIO.Read.named("ReadFromAvro")
                       .from("gs://my_bucket/path/records-*.avro")
                       .withSchema(schema));

Al igual que con otras fuentes de Dataflow basadas en archivos, la transformación AvroIO.Read puede leer varios archivos de entrada. Para obtener más información sobre cómo controlar varios archivos cuando se lee desde las fuentes basadas en archivos, consulta Lee datos de entrada.

Escribe con AvroIO

La transformación AvroIO.Write escribe una PCollection de registros Avro en uno o más archivos Avro. Para usar AvroIO.Write, deberás representar los datos de salida finales como una PCollection de objetos de clase Avro generados de manera automática o un PCollection de GenericRecords. Puedes usar un ParDo para transformar tus datos de manera adecuada.

Para escribir archivos específicos, usa una clase de Avro generada de forma automática como el esquema Avro:

Java

  PCollection<AvroAutoGenClass> filteredRecords = ...;
  filteredRecords.apply(AvroIO.Write.named("WriteToAvro")
                                    .to("gs://some/outputData.avro")
                                    .withSchema(AvroAutoGenClass.class)
                                    .withSuffix(".avro"));

Para escribir objetos GenericRecord, puedes pasar un org.apache.avro.Schema (a menudo mediante el análisis de un archivo .avsc) o un esquema escrito como una string codificada en JSON. En la siguiente muestra de código, se analiza un archivo .avsc a fin de obtener un objeto Schema y lo usa para escribir archivos Avro de salida fragmentados en Google Cloud Storage:

Java

  Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

  PCollection<GenericRecord> records = ...;
  records.apply(AvroIO.Write.named("WriteToAvro")
                            .to("gs://my_bucket/path/numbers")
                            .withSchema(schema)
                            .withSuffix(".avro"));

Ten en cuenta que AvroIO.Write escribe en varios archivos de salida de forma predeterminada. Para obtener información adicional, consulta Escribe datos de salida.