Scrivere da Dataflow a Cloud Storage

Questo documento descrive come scrivere dati di testo da Dataflow in Cloud Storage utilizzando il TextIO connettore I/O di Apache Beam.

Includi la dipendenza dalla libreria Google Cloud

Per utilizzare il connettore TextIO con Cloud Storage, includi la seguente dipendenza. Questa libreria fornisce un gestore dello schema per i nomi dei file "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Vai

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Per ulteriori informazioni, consulta Installa l'SDK Apache Beam.

Abilita gRPC sul connettore I/O Apache Beam su Dataflow

Puoi connetterti a Cloud Storage utilizzando gRPC tramite il connettore I/O Apache Beam su Dataflow. gRPC è un framework RPC (chiamata di procedura remota) open source ad alte prestazioni sviluppato da Google che puoi utilizzare per interagire con Cloud Storage.

Per velocizzare le richieste di scrittura del job Dataflow in Cloud Storage, puoi attivare il connettore I/O Apache Beam su Dataflow per l'utilizzo di gRPC.

Riga di comando

  1. Assicurati di utilizzare la versione 2.55.0 o successiva dell' SDK Apache Beam.
  2. Per eseguire un job Dataflow, utilizza l'opzione di pipeline --additional-experiments=use_grpc_for_gcs. Per informazioni sulle diverse opzioni di pipeline, consulta Flag facoltativi.

SDK Apache Beam

  1. Assicurati di utilizzare la versione 2.55.0 o successiva dell' SDK Apache Beam.
  2. Per eseguire un job Dataflow, utilizza l'opzione --experiments=use_grpc_for_gcs pipeline. Per informazioni sulle diverse opzioni di pipeline, consulta Opzioni di base.

Puoi configurare il connettore I/O di Apache Beam su Dataflow per generare metriche relative a gRPC in Cloud Monitoring. Le metriche relative a gRPC possono aiutarti a:

  • Monitora e ottimizza le prestazioni delle richieste gRPC a Cloud Storage.
  • Risolvi i problemi ed esegui il debug.
  • Ottieni informazioni sull'utilizzo e sul comportamento della tua applicazione.

Per informazioni su come configurare il connettore I/O di Apache Beam su Dataflow per generare metriche relative a gRPC, consulta Utilizzare le metriche lato client. Se la raccolta delle metriche non è necessaria per il tuo caso d'uso, puoi scegliere di disattivarla. Per istruzioni, vedi Disattivare le metriche lato client.

Parallelismo

Il parallelismo è determinato principalmente dal numero di shard. Per impostazione predefinita, il runner imposta automaticamente questo valore. Per la maggior parte delle pipeline, è consigliabile utilizzare il comportamento predefinito. In questo documento, consulta Best practice.

Prestazioni

La tabella seguente mostra le metriche sul rendimento per le scritture su Cloud Storage. I carichi di lavoro sono stati eseguiti su un e2-standard2 worker utilizzando l'SDK Apache Beam 2.49.0 per Java. Non hanno utilizzato Runner v2.

100 milioni di record | 1 kB | 1 colonna Velocità effettiva (byte) Velocità effettiva (elementi)
Scrivere 130 MB/s 130.000 elementi al secondo

Queste metriche si basano su semplici pipeline batch. Sono progettati per confrontare il rendimento tra i connettori I/O e non sono necessariamente rappresentativi delle pipeline reali. Le prestazioni della pipeline Dataflow sono complesse e dipendono dal tipo di VM, dai dati in fase di elaborazione, dalle prestazioni di origini e destinazioni esterne e dal codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentative delle caratteristiche di prestazioni di altri SDK per lingua. Per ulteriori informazioni, consulta Rendimento IO di Beam.

Best practice

  • In generale, evita di impostare un numero specifico di shard. In questo modo, il gestore può selezionare un valore appropriato per la scala. Se ottimizzi il numero di frammenti, ti consigliamo di scrivere tra 100 MB e 1 GB per frammento. Tuttavia, il valore ottimale potrebbe dipendere dal carico di lavoro.

  • Cloud Storage può scalare fino a un numero molto elevato di richieste al secondo. Tuttavia, se la pipeline presenta picchi elevati nel volume di scrittura, ti consigliamo di scrivere in più bucket per evitare di sovraccaricare temporaneamente un singolo bucket Cloud Storage.

  • In generale, la scrittura in Cloud Storage è più efficiente quando ogni scrittura è più grande (1 KB o più). La scrittura di record di piccole dimensioni in un numero elevato di file può comportare un peggioramento delle prestazioni per byte.

  • Quando generi i nomi dei file, ti consigliamo di utilizzare nomi non sequenziali per distribuire il carico. Per ulteriori informazioni, consulta la sezione Utilizzare una convenzione di denominazione che distribuisca il carico in modo uniforme tra gli intervalli di chiavi.

  • Quando assegni un nome ai file, non utilizzare il simbolo @ ("@") seguito da un numero o da un asterisco ("*"). Per ulteriori informazioni, consulta "@*" e "@N" sono specifiche di sharding riservate.

Esempio: scrivere file di testo in Cloud Storage

L'esempio seguente crea una pipeline batch che scrive file di testo utilizzando la compressione GZIP:

Java

Per autenticarti a Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Se l'input PCollection è illimitato, devi definire una finestra o un attivatore nella raccolta e poi specificare le scritture con finestra chiamando TextIO.Write.withWindowedWrites.

Python

Per autenticarti a Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Per il percorso di output, specifica un percorso Cloud Storage che includa il nome del bucket e un prefisso del nome del file. Ad esempio, se specifichi gs://my_bucket/output/file, il connettore TextIO scrive nel my_bucket, il bucket Cloud Storage denominato my_bucket e i file di output hanno il prefisso output/file*.

Per impostazione predefinita, il connettore TextIO suddivide i file di output utilizzando una convenzione di denominazione come questa: <file-prefix>-00000-of-00001. Se vuoi, puoi specificare un suffisso del nome file e uno schema di compressione, come mostrato nell'esempio.

Per garantire scritture idempotenti, Dataflow scrive in un file temporaneo e poi copia il file temporaneo completato nel file finale. Per controllare la posizione in cui vengono archiviati questi file temporanei, utilizza il metodo withTempDirectory.

Passaggi successivi