Scrivere da Dataflow in Cloud Storage

Questo documento descrive come scrivere dati di testo da Dataflow in Cloud Storage utilizzando Apache Beam TextIO Connettore I/O.

Includi la dipendenza dalla libreria Google Cloud

Per utilizzare il connettore TextIO con Cloud Storage, includi quanto segue la dipendenza. Questa libreria fornisce un gestore di schema per i nomi file "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Vai

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Per ulteriori informazioni, vedi Installa l'SDK Apache Beam.

Parallelismo

Il parallelismo è determinato principalmente dal numero di shard. Per impostazione predefinita, runner imposta automaticamente questo valore. Per la maggior parte delle pipeline, l'utilizzo del metodo un comportamento consigliato. In questo documento, vedi Best practice.

Prestazioni

La tabella seguente mostra le metriche delle prestazioni per la scrittura di archiviazione ideale in Cloud Storage. I carichi di lavoro sono stati eseguiti su un worker e2-standard2, utilizzando Apache Beam SDK 2.49.0 per Java. Non ha utilizzato Runner v2.

100 mln di record | 1 kB | 1 colonna Velocità effettiva (byte) Velocità effettiva (elementi)
Scrittura 130 Mbps 130.000 elementi al secondo

Queste metriche si basano su pipeline batch semplici. Sono studiati per confrontare il rendimento tra connettori I/O e non sono necessariamente rappresentativi delle pipeline reali. Le prestazioni della pipeline Dataflow sono complesse e sono una funzione del tipo di VM, in fase di elaborazione, le prestazioni di origini e sink esterni e il codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentativi delle caratteristiche prestazionali di altri SDK linguistici di grandi dimensioni. Per ulteriori informazioni, vedi Beam IO Prestazioni.

Best practice

  • In generale, evita di impostare un numero specifico di shard. Questo permette al runner per selezionare un valore appropriato per la tua bilancia. Se modifichi il numero di per shard, consigliamo di scrivere un valore compreso tra 100 MB e 1 GB per shard. Tuttavia, il valore ottimale potrebbe dipendere dal carico di lavoro.

  • Cloud Storage può scalare fino a un numero molto elevato di richieste secondo. Tuttavia, se la tua pipeline presenta grandi picchi nel volume di scrittura, considera scrivere su più bucket, per evitare di sovraccaricare nel bucket Cloud Storage.

  • In generale, scrivere in Cloud Storage è più efficiente quando ogni maggiore (1 kB o più). Scrivendo piccoli record per un elevato numero di di file possono peggiorare le prestazioni per byte.

  • Quando generi nomi di file, considera l'utilizzo di nomi di file non sequenziali, nell'ordine per distribuire il carico. Per ulteriori informazioni, vedi Utilizza una convenzione di denominazione che distribuisce il carico in modo uniforme tra gli intervalli di chiavi.

  • Quando assegni un nome ai file, non utilizzare il segno chiocciola ("@") seguito da un numero o da un asterisco ("*"). Per ulteriori informazioni, vedi "@*" e "@N" sono specifiche di sharding riservate.

Esempio: scrivere file di testo in Cloud Storage

L'esempio seguente crea una pipeline batch che scrive file di testo utilizzando GZIP compressione:

Java

Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Se l'input PCollection è illimitato, devi definire una finestra o un attiva il trigger sulla raccolta, quindi specifica le scritture in una finestra richiamando TextIO.Write.withWindowedWrites.

Python

Per eseguire l'autenticazione in Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv : List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Per il percorso di output, specifica un percorso Cloud Storage che includa il percorso un nome bucket e un prefisso del nome file. Ad esempio, se specifichi gs://my_bucket/output/file, il connettore TextIO scrive nel Bucket Cloud Storage denominato my_bucket e i file di output hanno il prefisso output/file*.

Per impostazione predefinita, il connettore TextIO esegue lo sharding dei file di output, utilizzando una convenzione simile alla seguente: <file-prefix>-00000-of-00001. Facoltativamente, puoi specificare il suffisso di un nome file e uno schema di compressione, come mostrato nell'esempio.

Per garantire scritture idempotenti, Dataflow scrive in un file temporaneo quindi copia il file temporaneo completato nel file finale. Per controllare dove sono archiviati questi file temporanei: utilizza la withTempDirectory .

Passaggi successivi