Écrire des données de Dataflow vers Cloud Storage

Ce document explique comment écrire des données textuelles depuis Dataflow vers Cloud Storage à l'aide du Connecteur d'E/S TextIO d'Apache Beam.

Inclure la dépendance de la bibliothèque Google Cloud

Pour utiliser le connecteur TextIO avec Cloud Storage, incluez la dépendance suivante : Cette bibliothèque fournit un gestionnaire de schéma pour les noms de fichiers de type "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Pour en savoir plus, consultez la section Installer le SDK Apache Beam.

Parallélisme

Le parallélisme est déterminé principalement par le nombre de segments. Par défaut, l'exécuteur définit automatiquement cette valeur. Pour la plupart des pipelines, il est recommandé d'utiliser le comportement par défaut. Dans ce document, consultez la section Bonnes pratiques.

Performances

Le tableau suivant présente les métriques de performances pour l'écriture dans Cloud Storage. Les charges de travail ont été exécutées sur un nœud de calcul e2-standard2 à l'aide du SDK Apache Beam 2.49.0 pour Java. Elles n'ont pas utilisé l'exécuteur v2.

100 millions d'enregistrements | 1 Ko | 1 colonne Débit (octets) Débit (éléments)
Écriture 130 Mbit/s 130 000 éléments par seconde

Ces métriques sont basées sur des pipelines de traitement par lot simples. Elles ont pour but de comparer les performances entre les connecteurs d'E/S et ne sont pas nécessairement représentatives des pipelines réels. Les performances des pipelines Dataflow sont complexes et dépendent du type de machine virtuelle, des données traitées, des performances des sources et des récepteurs externes, ainsi que du code utilisateur. Les métriques sont basées sur l'exécution du SDK Java et ne sont pas représentatives des caractéristiques de performances des SDK d'autres langages. Pour en savoir plus, consultez la page Performances d'E/S Beam.

Bonnes pratiques

  • En général, évitez de définir un nombre spécifique de segments. Cela permet à l'exécuteur de sélectionner une valeur appropriée pour vos besoins. Si vous ajustez le nombre de segments, nous vous recommandons d'écrire entre 100 Mo et 1 Go par segment. Cependant, la valeur optimale peut dépendre de la charge de travail.

  • Cloud Storage peut évoluer jusqu'à un très grand nombre de requêtes par seconde. Toutefois, si le pipeline comporte de grands pics d'écriture, envisagez d'écrire dans plusieurs buckets afin d'éviter de surcharger temporairement un seul bucket Cloud Storage.

  • En général, l'écriture dans Cloud Storage est plus efficace lorsque chaque écriture est plus importante (1 Ko ou plus). L'écriture de petits enregistrements dans un grand nombre de fichiers peut entraîner un baisse de performance notable par octet.

  • Lorsque vous générez des noms de fichiers, pensez à utiliser des noms de fichiers non séquentiels afin de répartir la charge. Pour plus d'informations, consultez la section Utiliser une convention de dénomination qui répartit la charge uniformément entre les plages de clés.

  • Lorsque vous nommez des fichiers, n'utilisez pas le signe arobase (@) suivi d'un nombre ou d'un astérisque (*). Pour en savoir plus, consultez les sections "@*" et "@N" sont des spécifications de segmentation réservées.

Exemple : Écrire des fichiers texte dans Cloud Storage

L'exemple suivant crée un pipeline de traitement par lot qui écrit des fichiers texte avec une compression GZIP :

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour un environnement de développement local.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Si la PCollection d'entrée est illimitée, vous devez définir une fenêtre ou un déclencheur sur la collection, puis spécifier les écritures fenêtrées en appelant TextIO.Write.withWindowedWrites.

Python

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour un environnement de développement local.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self

def write_to_cloud_storage(argv : List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Comme chemin de sortie, spécifiez un chemin d'accès Cloud Storage comprenant le nom du bucket et un préfixe de nom de fichier. Par exemple, si vous spécifiez gs://my_bucket/output/file, le connecteur TextIO écrit dans le bucket Cloud Storage nommé my_bucket et le nom des fichiers de sortie est basé sur le préfixe output/file*.

Par défaut, le connecteur TextIO segmente les fichiers de sortie, en suivant une convention d'attribution de noms du type : <file-prefix>-00000-of-00001. Vous pouvez éventuellement spécifier un suffixe de nom de fichier et un schéma de compression, comme illustré dans l'exemple.

Pour garantir les écritures idempotentes, Dataflow écrit dans un fichier temporaire, puis copie le fichier temporaire complété dans le fichier final.

Étapes suivantes