Von Dataflow in Cloud Storage schreiben

In diesem Dokument wird beschrieben, wie Sie Textdaten mit dem E/A-Connector TextIO von Apache Beam in Cloud Storage schreiben.

Google Cloud-Bibliotheksabhängigkeit einschließen

Schließen Sie die folgende Abhängigkeit ein, um den TextIO-Connector mit Cloud Storage zu verwenden. Diese Bibliothek bietet einen Schema-Handler für "gs://"-Dateinamen.

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Einfach loslegen (Go)

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Weitere Informationen finden Sie unter Apache Beam SDK installieren.

Parallelität

Die Parallelität wird hauptsächlich durch die Anzahl der Shards bestimmt. Standardmäßig legt der Runner diesen Wert automatisch fest. Für die meisten Pipelines wird das Standardverhalten empfohlen. Siehe Best Practices in diesem Dokument.

Leistung

Die folgende Tabelle enthält Leistungsmesswerte zum Schreiben in Cloud Storage. Die Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Sie haben nicht Runner v2 verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Schreiben 130 Mbit/s 130.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batchpipelines. Sie sollen die Leistung von E/A-Connectors vergleichen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Leistung von Beam IO.

Best Practices

  • Im Allgemeinen sollten Sie keine bestimmte Anzahl von Shards festlegen. Dadurch kann der Runner einen geeigneten Wert für Ihre Skalierung auswählen. Wenn Sie die Anzahl der Shards optimieren, empfehlen wir zwischen 100 MB und 1 GB pro Shard. Der optimale Wert kann jedoch von der Arbeitslast abhängen.

  • Cloud Storage kann auf eine sehr große Anzahl von Anfragen pro Sekunde skaliert werden. Wenn die Pipeline jedoch große Spitzen beim Schreibvolumen aufweist, sollten Sie sie in mehrere Buckets schreiben, um eine vorübergehende Überlastung eines einzelnen Cloud Storage-Buckets zu vermeiden.

  • Im Allgemeinen ist das Schreiben in Cloud Storage effizienter, wenn jeder Schreibvorgang größer ist (1 KB oder höher). Wenn kleine Datensätze in eine große Anzahl von Dateien geschrieben werden, kann dies die Leistung pro Byte beeinträchtigen.

  • Wenn Sie Dateinamen generieren, sollten Sie nicht sequenzielle Dateinamen verwenden, um die Last zu verteilen. Weitere Informationen finden Sie unter Namenskonvention verwenden, um die Last gleichmäßig über die Schlüsselbereiche zu verteilen.

  • Verwenden Sie beim Benennen von Dateien nicht das ett-Zeichen („@“), gefolgt von einer Zahl oder einem Sternchen („*“). Weitere Informationen finden Sie unter „@*” und „@N” sind reservierte Fragmentierungsspezifikationen.

Beispiel: Textdateien in Cloud Storage schreiben

Im folgenden Beispiel wird eine Batchpipeline erstellt, die Textdateien mit GZIP-Komprimierung schreibt:

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Wenn die Eingabe-PCollection unbegrenzt ist, müssen Sie ein Fenster oder einen Trigger für die Sammlung definieren und dann Fensterschreibvorgänge durch Aufrufen von TextIO.Write.withWindowedWrites angeben.

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self

def write_to_cloud_storage(argv : List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Geben Sie für den Ausgabepfad einen Cloud Storage-Pfad an, der den Bucket-Namen und ein Dateinamenspräfix enthält. Wenn Sie beispielsweise gs://my_bucket/output/file angeben, schreibt der TextIO-Connector in den Cloud Storage-Bucket mit dem Namen my_bucket und die Ausgabedateien haben das Präfix output/file*.

Standardmäßig fragmentiert der TextIO-Connector die Ausgabedateien mit einer Namenskonvention wie dieser: <file-prefix>-00000-of-00001. Optional können Sie ein Suffix für den Dateinamen und ein Komprimierungsschema angeben, wie im Beispiel gezeigt.

Um idempotente Schreibvorgänge zu gewährleisten, schreibt Dataflow in eine temporäre Datei und kopiert die fertige temporäre Datei dann in die endgültige Datei.

Nächste Schritte