E/S gérées Dataflow

Le connecteur d'E/S géré est une transformation Apache Beam qui fournit une API commune pour créer des sources et des récepteurs.

Présentation

Vous créez le connecteur d'E/S géré à l'aide du code Apache Beam, comme n'importe quel autre connecteur d'E/S. Vous spécifiez une source ou un récepteur à instancier et à transmettre dans un ensemble de paramètres de configuration. Vous pouvez également placer les paramètres de configuration dans un fichier YAML et fournir une URL vers ce fichier.

Sur le backend, Dataflow traite le connecteur d'E/S géré comme un service, ce qui lui permet de gérer les opérations d'exécution du connecteur. Vous pouvez ensuite vous concentrer sur la logique métier de votre pipeline plutôt que sur la gestion de ces détails.

Pour en savoir plus sur l'API d'E/S gérée, consultez la page Managed dans la documentation du SDK Java Apache Beam.

Apache Iceberg

Pour Apache Iceberg, les E/S gérées utilisent les paramètres de configuration suivants :

Nom Type de données Description
table chaîne Identifiant de la table Apache Iceberg. Exemple : "db.table1".
catalog_name chaîne Nom du catalogue. Exemple : "local".
catalog_properties carte Mappage des propriétés de configuration pour le catalogue Apache Iceberg. Les propriétés requises dépendent du catalogue. Pour en savoir plus, consultez CatalogUtil dans la documentation Apache Iceberg.
config_properties carte Ensemble facultatif de propriétés de configuration Hadoop. Pour en savoir plus, consultez la page CatalogUtil dans la documentation Apache Iceberg.

Pour en savoir plus et obtenir des exemples de code, consultez les sujets suivants :

Apache Kafka

Pour Apache Kafka, les E/S gérées utilisent les paramètres de configuration suivants :

Configuration en lecture et en écriture Type de données Description
bootstrap_servers chaîne Obligatoire. Liste de serveurs d'amorçage Kafka, séparés par une virgule. Exemple : localhost:9092.
topic chaîne Obligatoire. Sujet Kafka à lire.
file_descriptor_path chaîne Chemin d'accès à un ensemble de descripteurs de fichier du tampon de protocole. S'applique uniquement si la valeur de data_format est "PROTO".
data_format chaîne Format des messages. Valeurs autorisées : "AVRO", "JSON", "PROTO", "RAW". La valeur par défaut est "RAW", qui lit ou écrit les octets bruts de la charge utile du message.
message_name chaîne Nom du message du tampon de protocole. Obligatoire si la valeur de data_format est "PROTO".
schema chaîne

Schéma de message Kafka. Le type de schéma attendu dépend du format de données :

Pour les pipelines de lecture, ce paramètre est ignoré si la valeur de confluent_schema_registry_url est définie.

Lire la configuration
auto_offset_reset_config chaîne

Spécifie le comportement en l'absence de décalage initial ou si le décalage actuel n'existe plus sur le serveur Kafka. Les valeurs suivantes sont acceptées :

  • "earliest" : réinitialise le décalage à la valeur la plus ancienne.
  • "latest" : réinitialise le décalage à la valeur la plus récente.

La valeur par défaut est "latest".

confluent_schema_registry_subject chaîne Sujet d'un registre de schéma Confluent. Obligatoire si le paramètre confluent_schema_registry_url est spécifié.
confluent_schema_registry_url chaîne URL d'un registre de schéma Confluent. Si spécifié, le paramètre schema est ignoré.
consumer_config_updates carte Définit les paramètres de configuration du client Kafka. Pour en savoir plus, consultez la section Configurations des clients dans la documentation de Kafka. Vous pouvez utiliser ce paramètre pour personnaliser le client Kafka.
max_read_time_seconds int Durée de lecture maximale, en secondes. Cette option génère une PCollection limitée et est principalement destinée aux tests ou à d'autres scénarios hors production.
Écrire la configuration
producer_config_updates carte Définit les paramètres de configuration du producteur Kafka. Pour en savoir plus, consultez la section Configurations des producteurs dans la documentation de Kafka. Vous pouvez utiliser ce paramètre pour personnaliser le producteur Kafka.

Pour lire des messages Avro ou JSON, vous devez spécifier un schéma de message. Pour définir un schéma directement, utilisez le paramètre schema. Pour fournir le schéma via un registre de schéma Confluent, définissez les paramètres confluent_schema_registry_url et confluent_schema_registry_subject.

Pour lire ou écrire des messages de tampon de protocole, spécifiez un schéma de message ou définissez le paramètre file_descriptor_path.