E/S administrada de Dataflow

El conector de E/S administrada es una transformación de Apache Beam que proporciona una API común para crear fuentes y receptores. En el backend, Dataflow trata el conector de E/S administrada como un servicio, lo que permite que Dataflow administre las operaciones de entorno de ejecución del conector. Luego, puedes enfocarte en la lógica empresarial de la canalización, en lugar de administrar estos detalles.

Debes crear el conector de E/S administrada con el código de Apache Beam, como cualquier otro conector de E/S. Especificas una fuente o un receptor para crear una instancia y pasar un conjunto de parámetros de configuración.

Para obtener más información sobre la API de E/S administrada, consulta Managed en la documentación del SDK de Java de Apache Beam.

Usa E/S administrada con Apache Iceberg

A partir del SDK de Apache Beam para Java 2.56.0, el conector de E/S administrado admite la lectura y escritura de catálogos de Apache Iceberg.

Agrega las siguientes dependencias:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.56.0</version>
</dependency>

En el siguiente ejemplo, se muestra cómo crear una canalización que lea desde Apache Iceberg y escriba los registros en un archivo de texto. En este ejemplo, se supone que el catálogo tiene una tabla llamada db.table1 con dos campos llamados id y name.

Java

// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
    .put("catalog_name", "local")
    .put("warehouse_location", location)
    .put("catalog_type", "hadoop")
    .build();

ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
  .put("table", "db.table1")
  .put("catalog_config", catalogConfig)
  .build();

// Build the pipeline.
PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfig(config))
    .get("output")
    // Format each record as a string with the format 'id:name'.
    .apply(MapElements
        .into(TypeDescriptors.strings())
        .via((row -> {
          return String.format("%d:%s",
              row.getInt64("id"),
              row.getString("name"));
        })))
    // Write to a text file.
    .apply(
        TextIO.write()
            .to(outputPath)
            .withNumShards(1)
            .withSuffix(".txt"));

También puedes colocar los parámetros de configuración en un archivo YAML y proporcionar una URL al archivo. En el siguiente YAML, se especifica la misma configuración que en el ejemplo de código anterior:

catalog_config:
  catalog_name: "local"
  warehouse_location: "<location>"
  catalog_type: "hadoop"
table: "db.table1"

Pasa la URL al archivo YAML de la siguiente manera:

Java

PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfigUrl("gs://path/to/config.yaml"))