Lire des données depuis Apache Iceberg vers Dataflow

Pour lire des données d'Apache Iceberg dans Dataflow, utilisez le connecteur d'E/S géré.

Dépendances

Ajoutez les dépendances suivantes au projet :

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Configuration

Pour Apache Iceberg, les E/S gérées utilisent les paramètres de configuration suivants :

Nom Type de données Description
table chaîne Identifiant de la table Apache Iceberg. Exemple : "db.table1".
catalog_name chaîne Nom du catalogue. Exemple : "local".
catalog_properties carte Mappage des propriétés de configuration pour le catalogue Apache Iceberg. Les propriétés requises dépendent du catalogue. Pour en savoir plus, consultez CatalogUtil dans la documentation Apache Iceberg.
config_properties carte Ensemble facultatif de propriétés de configuration Hadoop. Pour en savoir plus, consultez la page CatalogUtil dans la documentation Apache Iceberg.

Exemple

L'exemple suivant lit une table Apache Iceberg et écrit les données dans des fichiers texte.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ApacheIcebergRead {

  static final String CATALOG_TYPE = "hadoop";

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("Path to write the output file")
    String getOutputPath();

    void setOutputPath(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config))
        .getSinglePCollection()
        // Format each record as a string with the format 'id:name'.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              return String.format("%d:%s",
                  row.getInt64("id"),
                  row.getString("name"));
            })))
        // Write to a text file.
        .apply(
            TextIO.write()
                .to(options.getOutputPath())
                .withNumShards(1)
                .withSuffix(".txt"));

    pipeline.run().waitUntilFinish();
  }
}