Von Dataflow in Cloud Storage schreiben

In diesem Dokument wird beschrieben, wie Sie mit dem E/A-Connector TextIO von Apache Beam Textdaten aus Dataflow in Cloud Storage schreiben.

Google Cloud-Bibliotheksabhängigkeit einschließen

Schließen Sie die folgende Abhängigkeit ein, um den TextIO-Connector mit Cloud Storage zu verwenden. Diese Bibliothek bietet einen Schema-Handler für "gs://"-Dateinamen.

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Weitere Informationen finden Sie unter Apache Beam SDK installieren.

Parallelität

Die Parallelität wird hauptsächlich durch die Anzahl der Shards bestimmt. Standardmäßig legt der Runner diesen Wert automatisch fest. Für die meisten Pipelines wird das Standardverhalten empfohlen. In diesem Dokument finden Sie weitere Informationen unter Best Practices.

Leistung

Die folgende Tabelle enthält Leistungsmesswerte zum Schreiben in Cloud Storage. Die Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Runner v2 wurde nicht verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Schreiben 130 Mbit/s 130.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batchpipelines. Sie dienen zum Vergleich der Leistung zwischen E/A-Anschlüssen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Die Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Beam E/A-Leistung.

Best Practices

  • Vermeiden Sie im Allgemeinen die Festlegung einer bestimmten Anzahl von Shards. Dadurch kann der Runner einen geeigneten Wert für Ihre Skala auswählen. Wenn Sie die Anzahl der Shards optimieren, empfehlen wir zwischen 100 MB und 1 GB pro Shard. Der optimale Wert kann jedoch von der Arbeitslast abhängen.

  • Cloud Storage kann auf eine sehr große Anzahl von Anfragen pro Sekunde skaliert werden. Wenn Ihre Pipeline jedoch starke Spitzen im Schreibvolumen aufweist, sollten Sie sie in mehrere Buckets schreiben, um zu vermeiden, dass ein einzelner Cloud Storage-Bucket vorübergehend überlastet wird.

  • Im Allgemeinen ist das Schreiben in Cloud Storage effizienter, wenn jeder Schreibvorgang größer ist (1 KB oder höher). Das Schreiben kleiner Datensätze in eine große Anzahl an Dateien kann die Leistung pro Byte beeinträchtigen.

  • Verwenden Sie beim Generieren von Dateinamen nicht sequenzielle Dateinamen, um die Last zu verteilen. Weitere Informationen finden Sie unter Namenskonvention verwenden, um die Last gleichmäßig über die Schlüsselbereiche zu verteilen.

  • Verwenden Sie beim Benennen von Dateien nicht das ett-Zeichen („@“), gefolgt von einer Zahl oder einem Sternchen („*“). Weitere Informationen finden Sie unter „@*” und „@N” sind reservierte Fragmentierungsspezifikationen.

Beispiel: Textdateien in Cloud Storage schreiben

Im folgenden Beispiel wird eine Batchpipeline erstellt, die Textdateien mit GZIP-Komprimierung schreibt:

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Wenn die Eingabe-PCollection unbegrenzt ist, müssen Sie ein Fenster oder einen Trigger für die Sammlung definieren und dann Fensterschreibvorgänge durch Aufrufen von TextIO.Write.withWindowedWrites angeben.

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Geben Sie für den Ausgabepfad einen Cloud Storage-Pfad an, der den Bucket-Namen und ein Dateinamenspräfix enthält. Wenn Sie beispielsweise gs://my_bucket/output/file angeben, schreibt der TextIO-Connector in den Cloud Storage-Bucket mit dem Namen my_bucket und die Ausgabedateien haben das Präfix output/file*.

Standardmäßig fragmentiert der TextIO-Connector die Ausgabedateien mit einer Namenskonvention wie dieser: <file-prefix>-00000-of-00001. Optional können Sie ein Suffix für den Dateinamen und ein Komprimierungsschema angeben, wie im Beispiel gezeigt.

Um idempotente Schreibvorgänge sicherzustellen, schreibt Dataflow in eine temporäre Datei und kopiert die abgeschlossene temporäre Datei dann in die endgültige Datei. Mit der Methode withTempDirectory können Sie steuern, wo diese temporären Dateien gespeichert werden.

Nächste Schritte