E/S gérées Dataflow

Le connecteur d'E/S géré est une transformation Apache Beam qui fournit une API commune pour créer des sources et des récepteurs. Sur le backend, Dataflow traite le connecteur d'E/S géré comme un service, ce qui lui permet de gérer les opérations d'exécution du connecteur. Vous pouvez ensuite vous concentrer sur la logique métier de votre pipeline plutôt que sur la gestion de ces détails.

Vous créez le connecteur d'E/S géré à l'aide du code Apache Beam, comme n'importe quel autre connecteur d'E/S. Vous spécifiez une source ou un récepteur à instancier et à transmettre dans un ensemble de paramètres de configuration.

Pour en savoir plus sur l'API d'E/S gérée, consultez la page Managed dans la documentation du SDK Java Apache Beam.

Utiliser des E/S gérées avec Apache Iceberg

À partir du SDK Apache Beam pour Java 2.56.0, le connecteur d'E/S géré permet de lire et d'écrire des catalogues Apache Iceberg.

Ajoutez les dépendances suivantes :

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.56.0</version>
</dependency>

L'exemple suivant montre comment créer un pipeline qui lit les données depuis Apache Iceberg et écrit les enregistrements dans un fichier texte. Cet exemple suppose que le catalogue contient une table nommée db.table1 avec deux champs nommés id et name.

Java

// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
    .put("catalog_name", "local")
    .put("warehouse_location", location)
    .put("catalog_type", "hadoop")
    .build();

ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
  .put("table", "db.table1")
  .put("catalog_config", catalogConfig)
  .build();

// Build the pipeline.
PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfig(config))
    .get("output")
    // Format each record as a string with the format 'id:name'.
    .apply(MapElements
        .into(TypeDescriptors.strings())
        .via((row -> {
          return String.format("%d:%s",
              row.getInt64("id"),
              row.getString("name"));
        })))
    // Write to a text file.
    .apply(
        TextIO.write()
            .to(outputPath)
            .withNumShards(1)
            .withSuffix(".txt"));

Vous pouvez également placer les paramètres de configuration dans un fichier YAML et fournir une URL vers ce fichier. Le code YAML suivant spécifie la même configuration que l'exemple de code précédent :

catalog_config:
  catalog_name: "local"
  warehouse_location: "<location>"
  catalog_type: "hadoop"
table: "db.table1"

Transmettez l'URL au fichier YAML comme suit:

Java

PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfigUrl("gs://path/to/config.yaml"))