Scrivere da Dataflow in Cloud Storage

Questo documento descrive come scrivere dati di testo da Dataflow per Cloud Storage utilizzando Apache Beam TextIO Connettore I/O.

Includi la dipendenza dalla libreria Google Cloud

Per utilizzare il connettore TextIO con Cloud Storage, includi la seguente dipendenza. Questa libreria fornisce un gestore di schema per i nomi file "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Vai

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Per ulteriori informazioni, vedi Installa l'SDK Apache Beam.

Abilita gRPC sul connettore I/O Apache Beam su Dataflow

Puoi connetterti a Cloud Storage utilizzando gRPC tramite il connettore I/O Apache Beam su Dataflow. gRPC è un framework RPC (Remote Procedure Call) open source ad alte prestazioni sviluppato da Google che puoi utilizzare per interagire con Cloud Storage.

Per accelerare le richieste di scrittura del job Dataflow su Cloud Storage, puoi abilitare il connettore Apache Beam I/O su Dataflow per utilizzare gRPC.

Riga di comando

  1. Assicurati di utilizzare lo Versione dell'SDK Apache Beam 2.55.0 o versioni successive.
  2. Per eseguire un job Dataflow, utilizza l'opzione pipeline --additional-experiments=use_grpc_for_gcs. Per informazioni sulle diverse opzioni della pipeline, consulta Flag facoltativi.

SDK Apache Beam

  1. Assicurati di utilizzare la versione 2.55.0 o successiva dell'SDK Apache Beam.
  2. Per eseguire un job Dataflow, utilizza l'opzione --experiments=use_grpc_for_gcs pipeline. Per informazioni sulle diverse opzioni di pipeline, consulta Opzioni di base.

Puoi configurare il connettore Apache Beam I/O su Dataflow per generare gRPC e le metriche correlate in Cloud Monitoring. Le metriche relative a gRPC possono aiutarti a:

  • Monitora e ottimizza le prestazioni delle richieste gRPC a Cloud Storage.
  • Risolvere i problemi ed eseguirne il debug.
  • Ottieni informazioni sull'utilizzo e sul comportamento della tua applicazione.

Per informazioni su come configurare il connettore I/O di Apache Beam su Dataflow per generare metriche relative a gRPC, consulta Utilizzare le metriche lato client. Se la raccolta delle metriche non è necessaria per il tuo caso d'uso, puoi scegliere di disattivare la raccolta. Per istruzioni, vedi Disattivare le metriche lato client.

Parallelismo

Il parallelismo è determinato principalmente dal numero di shard. Per impostazione predefinita, il runner imposta automaticamente questo valore. Per la maggior parte delle pipeline, l'utilizzo del metodo di rete. In questo documento, vedi Best practice.

Prestazioni

La tabella seguente mostra le metriche delle prestazioni per la scrittura di archiviazione ideale in Cloud Storage. I carichi di lavoro sono stati eseguiti su un worker e2-standard2, utilizzando Apache Beam SDK 2.49.0 per Java. Non ha utilizzato Runner v2.

100 milioni di record | 1 kB | 1 colonna Velocità effettiva (byte) Velocità effettiva (elementi)
Scrittura 130 Mbps 130.000 elementi al secondo

Queste metriche si basano su pipeline batch semplici. Sono progettati per confrontare il rendimento tra i connettori I/O e non sono necessariamente rappresentativi delle pipeline reali. Le prestazioni della pipeline Dataflow sono complesse e sono una funzione del tipo di VM, la configurazione in fase di elaborazione, le prestazioni di origini e sink esterni e il codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentativi delle caratteristiche prestazionali di altri SDK linguistici di grandi dimensioni. Per ulteriori informazioni, consulta la sezione I/O di trasmissione Prestazioni.

Best practice

  • In generale, evita di impostare un numero specifico di shard. Questo permette al runner per selezionare un valore appropriato per la tua bilancia. Se ottimizzi il numero di frammenti, ti consigliamo di scrivere tra 100 MB e 1 GB per frammento. Tuttavia, il valore ottimale potrebbe dipendere dal carico di lavoro.

  • Cloud Storage può scalare fino a un numero molto elevato di richieste secondo. Tuttavia, se la tua pipeline presenta grandi picchi nel volume di scrittura, considera scrivere su più bucket, per evitare di sovraccaricare nel bucket Cloud Storage.

  • In generale, scrivere in Cloud Storage è più efficiente quando ogni maggiore (1 kB o più). La scrittura di record di piccole dimensioni in un numero elevato di file può comportare un peggioramento delle prestazioni per byte.

  • Quando generi i nomi dei file, ti consigliamo di utilizzare nomi non sequenziali per distribuire il carico. Per ulteriori informazioni, consulta la sezione Utilizzare una convenzione di denominazione che distribuisca il carico in modo uniforme tra gli intervalli di chiavi.

  • Quando assegni un nome ai file, non utilizzare il simbolo @ ("@") seguito da un numero o da un asterisco ("*"). Per ulteriori informazioni, consulta "@*" e "@N" sono specifiche di sharding riservate.

Esempio: scrivere file di testo in Cloud Storage

L'esempio seguente crea una pipeline batch che scrive file di testo utilizzando GZIP compressione:

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Se l'input PCollection è illimitato, devi definire una finestra o un attiva il trigger sulla raccolta, quindi specifica le scritture in una finestra richiamando TextIO.Write.withWindowedWrites

Python

Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Per il percorso di output, specifica un percorso Cloud Storage che includa il percorso un nome bucket e un prefisso del nome file. Ad esempio, se specifichi gs://my_bucket/output/file, il connettore TextIO scrive nel my_bucket, il bucket Cloud Storage denominato my_bucket e i file di output hanno il prefisso output/file*.

Per impostazione predefinita, il connettore TextIO esegue lo sharding dei file di output, utilizzando una convenzione simile alla seguente: <file-prefix>-00000-of-00001. Facoltativamente, puoi specificare il suffisso di un nome file e uno schema di compressione, come mostrato nell'esempio.

Per garantire scritture idempotenti, Dataflow scrive in un file temporaneo quindi copia il file temporaneo completato nel file finale. Per controllare la posizione in cui vengono archiviati questi file temporanei, utilizza il metodo withTempDirectory.

Passaggi successivi