Écrire des données de Dataflow vers Cloud Storage

Ce document explique comment écrire des données textuelles depuis Dataflow vers Cloud Storage à l'aide du Connecteur d'E/S TextIO d'Apache Beam.

Inclure la dépendance de la bibliothèque Google Cloud

Pour utiliser le connecteur TextIO avec Cloud Storage, incluez la dépendance suivante : Cette bibliothèque fournit un gestionnaire de schéma pour les noms de fichiers de type "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Pour en savoir plus, consultez la section Installer le SDK Apache Beam.

Activer gRPC sur le connecteur d'E/S Apache Beam dans Dataflow

Vous pouvez vous connecter à Cloud Storage à l'aide de gRPC via le connecteur d'E/S Apache Beam sur Dataflow. gRPC est un framework d'appel de procédure à distance (RPC) Open Source hautes performances développé par Google que vous pouvez utiliser pour interagir avec Cloud Storage.

Pour accélérer les requêtes d'écriture de votre tâche Dataflow vers Cloud Storage, vous pouvez activer le connecteur d'E/S Apache Beam sur Dataflow pour utiliser gRPC.

Ligne de commande

  1. Assurez-vous d'utiliser le SDK Apache Beam version 2.55.0 ou ultérieure.
  2. Pour exécuter un job Dataflow, utilisez l'option de pipeline --additional-experiments=use_grpc_for_gcs. Pour en savoir plus sur les différentes options de pipeline, consultez la section Options facultatives.

SDK Apache Beam

  1. Assurez-vous d'utiliser le SDK Apache Beam version 2.55.0 ou ultérieure.
  2. Pour exécuter un job Dataflow, utilisez l'option de pipeline --experiments=use_grpc_for_gcs. Pour en savoir plus sur les différentes options de pipeline, consultez la section Options de base.

Vous pouvez configurer le connecteur d'E/S Apache Beam sur Dataflow pour générer des métriques liées à gRPC dans Cloud Monitoring. Les métriques liées à gRPC peuvent vous aider à effectuer les opérations suivantes:

  • Surveillez et optimisez les performances des requêtes gRPC envoyées à Cloud Storage.
  • Dépanner et déboguer les problèmes
  • Obtenez des insights sur l'utilisation et le comportement de votre application.

Pour savoir comment configurer le connecteur d'E/S Apache Beam sur Dataflow afin de générer des métriques liées à gRPC, consultez Utiliser des métriques côté client. Si la collecte de métriques n'est pas nécessaire pour votre cas d'utilisation, vous pouvez désactiver la collecte de métriques. Pour obtenir des instructions, consultez Désactiver les métriques côté client.

Parallélisme

Le parallélisme est déterminé principalement par le nombre de segments. Par défaut, l'exécuteur définit automatiquement cette valeur. Pour la plupart des pipelines, il est recommandé d'utiliser le comportement par défaut. Dans ce document, consultez la section Bonnes pratiques.

Performances

Le tableau suivant présente les métriques de performances pour l'écriture dans Cloud Storage. Les charges de travail ont été exécutées sur un nœud de calcul e2-standard2 à l'aide du SDK Apache Beam 2.49.0 pour Java. Elles n'ont pas utilisé l'exécuteur v2.

100 millions d'enregistrements | 1 Ko | 1 colonne Débit (octets) Débit (éléments)
Écriture 130 Mbit/s 130 000 éléments par seconde

Ces métriques sont basées sur des pipelines de traitement par lot simples. Elles ont pour but de comparer les performances entre les connecteurs d'E/S et ne sont pas nécessairement représentatives des pipelines réels. Les performances des pipelines Dataflow sont complexes et dépendent du type de machine virtuelle, des données traitées, des performances des sources et des récepteurs externes, ainsi que du code utilisateur. Les métriques sont basées sur l'exécution du SDK Java et ne sont pas représentatives des caractéristiques de performances des SDK d'autres langages. Pour en savoir plus, consultez la page Performances d'E/S Beam.

Bonnes pratiques

  • En général, évitez de définir un nombre spécifique de segments. Cela permet à l'exécuteur de sélectionner une valeur appropriée pour vos besoins. Si vous ajustez le nombre de segments, nous vous recommandons d'écrire entre 100 Mo et 1 Go par segment. Cependant, la valeur optimale peut dépendre de la charge de travail.

  • Cloud Storage peut évoluer jusqu'à un très grand nombre de requêtes par seconde. Toutefois, si le pipeline comporte de grands pics d'écriture, envisagez d'écrire dans plusieurs buckets afin d'éviter de surcharger temporairement un seul bucket Cloud Storage.

  • En général, l'écriture dans Cloud Storage est plus efficace lorsque chaque écriture est plus importante (1 Ko ou plus). L'écriture de petits enregistrements dans un grand nombre de fichiers peut entraîner un baisse de performance notable par octet.

  • Lorsque vous générez des noms de fichiers, pensez à utiliser des noms de fichiers non séquentiels afin de répartir la charge. Pour plus d'informations, consultez la section Utiliser une convention de dénomination qui répartit la charge uniformément entre les plages de clés.

  • Lorsque vous nommez des fichiers, n'utilisez pas le signe arobase (@) suivi d'un nombre ou d'un astérisque (*). Pour en savoir plus, consultez les sections "@*" et "@N" sont des spécifications de segmentation réservées.

Exemple : Écrire des fichiers texte dans Cloud Storage

L'exemple suivant crée un pipeline de traitement par lot qui écrit des fichiers texte avec une compression GZIP :

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Si la PCollection d'entrée est illimitée, vous devez définir une fenêtre ou un déclencheur sur la collection, puis spécifier les écritures fenêtrées en appelant TextIO.Write.withWindowedWrites.

Python

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Comme chemin de sortie, spécifiez un chemin d'accès Cloud Storage comprenant le nom du bucket et un préfixe de nom de fichier. Par exemple, si vous spécifiez gs://my_bucket/output/file, le connecteur TextIO écrit dans le bucket Cloud Storage nommé my_bucket et le nom des fichiers de sortie est basé sur le préfixe output/file*.

Par défaut, le connecteur TextIO segmente les fichiers de sortie, en suivant une convention d'attribution de noms du type : <file-prefix>-00000-of-00001. Vous pouvez éventuellement spécifier un suffixe de nom de fichier et un schéma de compression, comme illustré dans l'exemple.

Pour garantir les écritures idempotentes, Dataflow écrit dans un fichier temporaire, puis copie le fichier temporaire complété dans le fichier final. Pour contrôler l'emplacement de stockage de ces fichiers temporaires, utilisez la méthode withTempDirectory.

Étape suivante