E/S Avro

Les transformations intégrées Read et Write pour les fichiers Avro sont incluses dans AvroIO. Vous pouvez utiliser AvroIO pour lire et écrire des fichiers locaux sur le système où votre programme Java s'exécute et des fichiers distants dans Google Cloud Storage.

Java

Remarque : Si vous souhaitez que votre pipeline lise ou écrive des fichiers locaux, vous devez utiliser DirectPipelineRunner pour exécuter votre pipeline localement. En effet, les instances de Google Compute Engine auxquelles le service Dataflow fait appel pour exécuter ce pipeline ne pourront pas accéder en lecture ni en écriture aux fichiers situés sur votre ordinateur local.

Spécifier un schéma Avro

Pour utiliser AvroIO, vous devez spécifier un schéma Avro décrivant les enregistrements à lire ou à écrire. Avro s'appuie sur des schémas pour décrire la sérialisation des données. Pour savoir comment fonctionnent ces schémas, consultez la documentation Avro.

Vous avez la possibilité de lire des enregistrements Avro spécifiques en fournissant un type de classe généré par Avro. Vous pouvez aussi lire GenericRecords en fournissant un objet org.apache.avro.Schema. Généralement, vous lisez l'objet Schema à partir d'un fichier de schéma (.avsc). Vous pouvez également spécifier un Schema dans un formulaire de chaîne encodé au format JSON.

Pour fournir un schéma, utilisez la méthode .withSchema avec la transformation AvroIO. Vous devez appeler .withSchema chaque fois que vous utilisez AvroIO.Read ou AvroIO.Write.

Lire avec AvroIO

La transformation AvroIO.Read lit les enregistrements d'un ou de plusieurs fichiers Avro et crée un PCollection dans lequel chaque élément représente un enregistrement. AvroIO.Read peut générer une collection PCollection contenant soit des objets d'une classe Avro générée automatiquement, soit des objets GenericRecord. Le type de PCollection résultant dépend du type de schéma que vous choisissez.

L'utilisation d'une classe Avro générée automatiquement se traduira par PCollection dont les éléments sont des objets relevant de ce même type de classe Avro, comme illustré ci-dessous :

Java

      PipelineOptions options = PipelineOptionsFactory.create();
      Pipeline p = Pipeline.create(options);

      PCollection<AvroAutoGenClass> records = p.apply(
        AvroIO.Read.named("ReadFromAvro")
              .from("gs://some/inputData.avro")
              .withSchema(AvroAutoGenClass.class));
    

Pour lire vos fichiers Avro dans PCollection<GenericRecord>, vous pouvez transmettre un objet org.apache.avro.Schema ou un schéma écrit en tant que chaîne codée au format JSON. L'exemple de code suivant obtient un objet org.apache.avro.Schema en analysant un fichier .avsc, puis utilise le Schema obtenu pour lire les fichiers Avro entrants partitionnés de Google Cloud Storage :

Java

      PipelineOptions options = PipelineOptionsFactory.create();
      Pipeline p = Pipeline.create(options);

      Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

      PCollection<GenericRecord> records =
        p.apply(AvroIO.Read.named("ReadFromAvro")
                           .from("gs://my_bucket/path/records-*.avro")
                           .withSchema(schema));
    

Comme avec d'autres sources Dataflow basées sur des fichiers, la transformation AvroIO.Read peut lire plusieurs fichiers d'entrée. Consultez la section Lire des données d'entrée pour savoir comment gérer plusieurs fichiers lorsque vous effectuez des opérations de lecture à partir de sources basées sur des fichiers.

Écrire avec AvroIO

La transformation AvroIO.Write écrit un enregistrement Avro PCollection dans un ou plusieurs fichiers Avro. Pour utiliser AvroIO.Write, vous devez représenter vos données de sortie finales sous la forme PCollection d'objets de classe Avro générés automatiquement ou de PCollection sur GenericRecords. Vous pouvez appliquer une transformation ParDo aux données pour leur donner la forme appropriée.

Pour écrire des enregistrements spécifiques, utilisez comme schéma Avro une classe Avro générée automatiquement :

Java

      PCollection<AvroAutoGenClass> filteredRecords = ...;
      filteredRecords.apply(AvroIO.Write.named("WriteToAvro")
                                        .to("gs://some/outputData.avro")
                                        .withSchema(AvroAutoGenClass.class)
                                        .withSuffix(".avro"));
    

Pour écrire des objets GenericRecord, vous pouvez transmettre un org.apache.avro.Schema (souvent en analysant un fichier .avsc) ou un schéma écrit en tant que chaîne codée au format JSON. L'exemple de code suivant analyse un fichier .avsc pour obtenir un objet Schema et l'utilise pour écrire des fichiers Avro de sortie partitionnés dans Google Cloud Storage :

Java

      Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

      PCollection<GenericRecord> records = ...;
      records.apply(AvroIO.Write.named("WriteToAvro")
                                .to("gs://my_bucket/path/numbers")
                                .withSchema(schema)
                                .withSuffix(".avro"));
    

Notez que AvroIO.Write écrit sur plusieurs fichiers de sortie par défaut. Pour plus d'informations, consultez la section Écrire des données de sortie.