E/S pipeline

Pour créer un pipeline, vous aurez généralement besoin de lire des données provenant d'une source externe, par exemple un fichier stocké dans Google Cloud Storage ou une table BigQuery. De même, vous souhaiterez que votre pipeline transmette les données de résultat à un récepteur de données externe analogue, comme des fichiers de sortie dans Cloud Storage ou BigQuery. Les SDK Dataflow fournissent des transformations capables de lire les données d'une source externe ou d'écrire des données sur un récepteur externe.

Les SDK Dataflow fournissent des transformations Read et Write pour un certain nombre de types de stockage de données courants. En outre, les API Read et Write sont extensibles. Si vous souhaitez que votre pipeline lise ou écrive dans un format de stockage de données non compatible avec les transformations intégrées, vous pouvez créer des extensions pour fournir vos propres opérations Read et Write.

Lire des données d'entrée

Read transforme les données lues d'une source externe et renvoie une représentation PCollection des données à utiliser par votre pipeline. Utilisez une transformation Read à tout moment pendant la construction de votre pipeline pour créer un nouveau PCollection, bien que ce soit plus courant au début de votre programme.

Java

Remarque : Les transformations Read n'ayant pas de PCollection en entrée, elles sont directement appliquées à Pipeline. Comme d'habitude, l'appel à apply renvoie un PCollection du type approprié, dont les éléments représentent les données. Pour plus d'informations, consultez la page Construire votre pipeline.

Lire des données depuis plusieurs emplacements

De nombreuses transformations Read, telles que Text, prennent en charge la lecture de plusieurs fichiers d'entrée correspondant à un opérateur global que vous spécifiez. Pensez à utiliser la transformation Read, qui utilise un opérateur global (*) pour lire tous les fichiers d'entrée correspondant à l'emplacement indiqué dans Google Cloud Storage :

Java

      p.apply(TextIO.Read.named("ReadFromText")
    		     .from("gs://my_bucket/path/to/input-*.csv");
    

Le Read ci-dessus lit tous les fichiers à l'emplacement indiqué dans Cloud Storage par le préfixe "input-" "et le suffixe ".csv".

Pour lire les données de sources disparates en un seul PCollection, lisez chacune d'entre elles de manière indépendante, puis utilisez la transformation Flatten pour créer un seul PCollection.

Écrire des données de sortie

Les transformations Write écrivent les données dans un PCollection vers une source de données externe. Vous utiliserez le plus souvent les transformations Write à la fin de votre programme pour afficher les résultats finaux de votre pipeline. Cependant, vous pouvez utiliser Write pour obtenir en sortie les données d'un PCollection à n'importe quelle étape de votre pipeline.

Pour utiliser une transformation Write, vous devez appeler la méthode apply sur le PCollection que vous souhaitez écrire et transmettre la transformation Write appropriée en tant qu'argument.

Java

Lorsque vous apply une transformation Write en PCollection, la valeur renvoyée est un objet de type PDone. L'objet PDone est un objet de résultat trivial et peut être ignoré.

Écrire dans plusieurs fichiers de sortie

Pour les données d'entrée et de sortie basées sur des fichiers, telles que les données texte, les transformations Write écrivent par défaut dans plusieurs fichiers de sortie. Le service Cloud Dataflow génère toujours automatiquement des fichiers de sortie fragmentés. Lorsque vous transmettez un nom de fichier de sortie à une transformation Write, le nom du fichier est utilisé comme préfixe pour tous les fichiers de sortie que produit la transformation Write.

Vous pouvez ajouter un suffixe à chaque fichier de sortie en spécifiant un suffixe à votre transformation Write.

Pensez à l'utilisation suivante de la transformation Write, qui écrit plusieurs fichiers de sortie dans un emplacement Cloud Storage. Chaque fichier est désigné par le préfixe "numbers", un tag numérique unique et le suffixe ".csv".

Java

      records.apply(TextIO.Write.named("WriteToText")
    			    .to("gs://my_bucket/path/to/numbers")
    			    .withSuffix(".csv"));
    

Le Write ci-dessus permet d'écrire plusieurs fichiers de sortie à l'emplacement indiqué dans Cloud Storage avec le préfixe "numbers" et le suffixe ".csv".

API d'E/S incluses dans les SDK Dataflow

Certaines API de source et de récepteur sont incluses dans les SDK Dataflow.

Java

Le SDK Dataflow pour Java fournit les transformations Read et Write pour certains formats de données courants :

API d'E/S supplémentaires

En plus des API d'E/S, les SDK Dataflow fournissent une API extensible, qui vous permet de créer vos propres sources et récepteurs de données personnalisés.

Java

Vous pouvez créer vos propres sources d'entrée et récepteurs de sortie personnalisés à l'aide des API de source et de récepteur de Dataflow.