Von Dataflow in Cloud Storage schreiben

In diesem Dokument wird beschrieben, wie Sie mit dem E/A-Connector TextIO von Apache Beam Textdaten aus Dataflow in Cloud Storage schreiben.

Google Cloud-Bibliotheksabhängigkeit einschließen

Schließen Sie die folgende Abhängigkeit ein, um den TextIO-Connector mit Cloud Storage zu verwenden. Diese Bibliothek bietet einen Schema-Handler für "gs://"-Dateinamen.

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Weitere Informationen finden Sie unter Apache Beam SDK installieren.

gRPC für den Apache Beam-E/A-Connector in Dataflow aktivieren

Sie können über den Apache Beam-E/A-Connector in Dataflow mit gRPC eine Verbindung zu Cloud Storage herstellen. gRPC ist ein leistungsstarkes Open-Source-RPC-Framework (Remote Procedure Call) von Google, mit dem Sie mit Cloud Storage interagieren können.

Um die Schreibanfragen Ihres Dataflow-Jobs an Cloud Storage zu beschleunigen, können Sie den Apache Beam-E/A-Connector in Dataflow für die Verwendung von gRPC aktivieren.

Befehlszeile

  1. Verwenden Sie das Apache Beam SDK in der Version 2.55.0 oder höher.
  2. Verwenden Sie die Pipelineoption --additional-experiments=use_grpc_for_gcs, um einen Dataflow-Job auszuführen. Informationen zu den verschiedenen Pipelineoptionen finden Sie unter Optionale Flags.

Apache Beam SDK

  1. Verwenden Sie das Apache Beam SDK in der Version 2.55.0 oder höher.
  2. Verwenden Sie die Pipelineoption --experiments=use_grpc_for_gcs, um einen Dataflow-Job auszuführen. Informationen zu den verschiedenen Pipelineoptionen finden Sie unter Grundlegende Optionen.

Sie können den Apache Beam-E/A-Connector in Dataflow konfigurieren, um gRPC-bezogene Messwerte in Cloud Monitoring zu generieren. Mit den gRPC-bezogenen Messwerten können Sie Folgendes tun:

  • Leistung von gRPC-Anfragen an Cloud Storage überwachen und optimieren
  • Probleme beheben
  • Sie erhalten Informationen zur Nutzung und zum Verhalten Ihrer Anwendung.

Informationen zum Konfigurieren des Apache Beam-E/A-Connectors in Dataflow zum Generieren von gRPC-bezogenen Messwerten finden Sie unter Clientseitige Messwerte verwenden. Wenn die Erfassung von Messwerten für Ihren Anwendungsfall nicht erforderlich ist, können Sie die Erfassung deaktivieren. Eine Anleitung finden Sie unter Clientseitige Messwerte deaktivieren.

Parallelität

Die Parallelisierung wird hauptsächlich durch die Anzahl der Shards bestimmt. Standardmäßig wird dieser Wert automatisch vom Runner festgelegt. Für die meisten Pipelines wird das Standardverhalten empfohlen. Weitere Informationen finden Sie in diesem Dokument unter Best Practices.

Leistung

Die folgende Tabelle enthält Leistungsmesswerte für das Schreiben in Cloud Storage. Die Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Runner v2 wurde nicht verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Schreiben 130 Mbit/s 130.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batch-Pipelines. Sie dienen zum Vergleich der Leistung zwischen E/A-Anschlüssen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Die Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Beam E/A-Leistung.

Best Practices

  • Vermeiden Sie es im Allgemeinen, eine bestimmte Anzahl von Shards festzulegen. So kann der Läufer einen geeigneten Wert für die Skalierung auswählen. Wenn Sie die Anzahl der Shards optimieren, empfehlen wir zwischen 100 MB und 1 GB pro Shard. Der optimale Wert hängt jedoch von der Arbeitslast ab.

  • Cloud Storage kann auf eine sehr große Anzahl von Anfragen pro Sekunde skaliert werden. Wenn in Ihrer Pipeline jedoch große Spitzen beim Schreibvolumen auftreten, sollten Sie in mehrere Buckets schreiben, um eine vorübergehende Überlastung eines einzelnen Cloud Storage-Bucket zu vermeiden.

  • Im Allgemeinen ist das Schreiben in Cloud Storage effizienter, wenn jeder Schreibvorgang größer ist (1 KB oder höher). Wenn Sie kleine Datensätze in eine große Anzahl von Dateien schreiben, kann dies zu einer geringeren Leistung pro Byte führen.

  • Verwenden Sie beim Generieren von Dateinamen nicht sequenzielle Dateinamen, um die Auslastung zu verteilen. Weitere Informationen finden Sie unter Namenskonvention verwenden, um die Last gleichmäßig über die Schlüsselbereiche zu verteilen.

  • Verwenden Sie beim Benennen von Dateien nicht das ett-Zeichen („@“), gefolgt von einer Zahl oder einem Sternchen („*“). Weitere Informationen finden Sie unter „@*” und „@N” sind reservierte Fragmentierungsspezifikationen.

Beispiel: Textdateien in Cloud Storage schreiben

Im folgenden Beispiel wird eine Batchpipeline erstellt, die Textdateien mit GZIP-Komprimierung schreibt:

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

Wenn die Eingabe-PCollection unbegrenzt ist, müssen Sie ein Fenster oder einen Trigger für die Sammlung definieren und dann Fensterschreibvorgänge durch Aufrufen von TextIO.Write.withWindowedWrites angeben.

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

Geben Sie für den Ausgabepfad einen Cloud Storage-Pfad an, der den Bucket-Namen und ein Dateinamenspräfix enthält. Wenn Sie beispielsweise gs://my_bucket/output/file angeben, schreibt der TextIO-Connector in den Cloud Storage-Bucket mit dem Namen my_bucket und die Ausgabedateien haben das Präfix output/file*.

Standardmäßig fragmentiert der TextIO-Connector die Ausgabedateien mit einer Namenskonvention wie dieser: <file-prefix>-00000-of-00001. Optional können Sie ein Suffix für den Dateinamen und ein Komprimierungsschema angeben, wie im Beispiel gezeigt.

Um idempotente Schreibvorgänge zu ermöglichen, schreibt Dataflow in eine temporäre Datei und kopiert die fertige temporäre Datei dann in die endgültige Datei. Mit der Methode withTempDirectory können Sie festlegen, wo diese temporären Dateien gespeichert werden.

Nächste Schritte