Best Practices für Dataflow-Pipelines

Auf dieser Seite werden Best Practices für die Entwicklung von Dataflow-Pipelines beschrieben. Die Verwendung dieser Best Practices hat folgende Vorteile:

  • Beobachtbarkeit und Leistung der Pipeline verbessern
  • Entwicklerproduktivität steigern
  • Testbarkeit von Pipelines verbessern

Die Apache Beam-Codebeispiele auf dieser Seite verwenden Java, der Inhalt gilt jedoch für die Apache Beam Java, Python und Go SDKs.

Zu berücksichtigende Fragen:

Berücksichtigen Sie beim Entwerfen Ihrer Pipeline die folgenden Fragen:

  • Wo sind die Eingabedaten Ihrer Pipeline gespeichert? Wie viele Sätze an Eingabedaten haben Sie?
  • Wie sehen meine Daten aus?
  • Was möchte ich mit meinen Daten tun?
  • Wo sollten die Ausgabedaten Ihrer Pipeline liegen?
  • Verwendet Ihr Dataflow-Job Assured Workloads?

Vorlagen verwenden

Verwenden Sie zum Beschleunigen der Pipelineentwicklung, anstatt eine Pipeline durch Schreiben von Apache Beam-Code zu erstellen, nach Möglichkeit eine Dataflow-Vorlage. Vorlagen haben folgende Vorteile:

  • Vorlagen sind wiederverwendbar.
  • Mit Vorlagen können Sie jeden Job anpassen, indem Sie bestimmte Pipelineparameter ändern.
  • Jeder, dem Sie Berechtigungen zuweisen, kann die Vorlage zum Bereitstellen der Pipeline verwenden. Beispielsweise kann ein Entwickler einen Job aus einer Vorlage erstellen und ein Data Scientist in der Organisation kann diese Vorlage später bereitstellen.

Sie können eine von Google bereitgestellte Vorlage verwenden oder eine eigene Vorlage erstellen. Einige von Google bereitgestellte Vorlagen ermöglichen Ihnen, benutzerdefinierte Logik als Pipelineschritt hinzuzufügen. Die Vorlage Pub/Sub für BigQuery enthält beispielsweise einen Parameter, um eine in Cloud Storage gespeicherte benutzerdefinierte JavaScript-Funktion (UDF) auszuführen.

Von Google bereitgestellte Vorlagen sind Open-Source-Ressourcen unter der Apache-Lizenz 2.0, sodass Sie sie als Grundlage für neue Pipelines verwenden können. Die Vorlagen sind auch als Codebeispiele nützlich. Sehen Sie sich den Vorlagencode im GitHub-Repository an.

Assured Workloads

Assured Workloads hilft bei der Durchsetzung von Sicherheits- und Compliance-Anforderungen für Google Cloud-Kunden. Beispiel: EU-Regionen und -Support mit Datenhoheitskontrollen helfen bei der Durchsetzung der Anforderungen an Datenstandort und Datenhoheit für Kunden in der EU. Um diese Funktionen bereitzustellen, sind einige Dataflow-Funktionen eingeschränkt oder begrenzt. Wenn Sie Assured Workloads mit Dataflow verwenden, müssen sich alle Ressourcen, auf die Ihre Pipeline zugreift, im Assured Workloads-Projekt oder -Ordner Ihrer Organisation befinden. Zu diesen Ressourcen gehören:

  • Cloud Storage-Buckets
  • BigQuery-Datasets
  • Pub/Sub-Themen und -Abos
  • Firestore-Datasets
  • E/A-Connectors

In Dataflow werden für Streamingjobs, die nach dem 7. März 2024 erstellt wurden, alle Nutzerdaten mit CMEK verschlüsselt.

Bei Streamingjobs, die vor dem 7. März 2024 erstellt wurden, sind Datenschlüssel, die für schlüsselbasierte Vorgänge wie Windowing, Gruppierung und Joining verwendet werden, nicht durch eine CMEK-Verschlüsselung geschützt. Um diese Verschlüsselung für Ihre Jobs zu aktivieren, beenden Sie den Job per Drain oder brechen Sie den Job ab und starten Sie ihn dann neu. Weitere Informationen finden Sie unter Verschlüsselung von Pipelinestatus-Artefakten.

Daten über Pipelines hinweg teilen

Es gibt keinen Dataflow-spezifischen pipelineübergreifenden Kommunikationsmechanismus zum Teilen von Daten oder zum Verarbeiten von Kontextinformationen zwischen Pipelines. Sie können eine langfristige Speichermöglichkeit wie Cloud Storage oder einen In-Memory-Cache wie App Engine für den Austausch von Daten zwischen Pipelineinstanzen verwenden.

Jobs planen

Sie können die Pipelineausführung auf folgende Arten automatisieren:

Best Practices für das Schreiben von Pipelinecode

Die folgenden Abschnitte enthalten Best Practices, die Sie beim Erstellen von Pipelines durch Schreiben von Apache Beam-Code verwenden können.

Apache Beam-Code strukturieren

Zum Erstellen von Pipelines wird in der Regel die generische Apache Beam-Transformation ParDo zur parallelen Verarbeitung verwendet. Wenn Sie eine ParDo-Transformation anwenden, geben Sie Code in Form eines DoFn-Objekts an. DoFn ist eine Apache Beam SDK-Klasse, die eine verteilte Verarbeitungsfunktion definiert.

Sie können sich den DoFn-Code als kleine, unabhängige Entitäten vorstellen. Es besteht die Möglichkeit, zahlreiche Instanzen auf unterschiedlichen Maschinen auszuführen, ohne dass diesen die Existenz der anderen Instanzen bekannt ist. Daher empfehlen wir, reine Funktionen zu erstellen, die sich ideal für die parallele und verteilte Natur von DoFn-Elementen eignen. Reine Funktionen haben folgende Eigenschaften:

  • Reine Funktionen hängen nicht vom ausgeblendeten oder externen Status ab.
  • Sie haben keine sichtbaren Nebenwirkungen.
  • Sie sind deterministisch.

Das reine Funktionsmodell ist nicht völlig starr. Wenn Ihr Code nicht von Faktoren abhängt, die vom Dataflow-Dienst nicht garantiert werden, können Statusinformationen oder externe Initialisierungsdaten für DoFn und andere Funktionsobjekte gültig sein.

Beachten Sie beim Strukturieren der ParDo-Transformationen und beim Erstellen der DoFn-Elemente folgende Richtlinien:

  • Wenn Sie die genau einmalige Verarbeitung verwenden, garantiert der Dataflow-Dienst, dass jedes Element in Ihrer Eingabe PCollection von einer Instanz DoFn genau einmal verarbeitet wird.
  • Der Dataflow-Dienst garantiert nicht, wie oft eine DoFn aufgerufen wird.
  • Der Dataflow-Dienst gibt keine genaue Garantie dafür, wie die verteilten Elemente gruppiert werden. Es wird nicht garantiert, ob und welche Elemente zusammen verarbeitet werden.
  • Der Dataflow-Dienst garantiert nicht die genaue Anzahl von DoFn-Instanzen, die im Verlauf einer Pipeline erstellt werden.
  • Der Dataflow-Dienst toleriert Fehler. Wenn die Worker Probleme haben, kann die Ausführung des Codes mehrfach wiederholt werden.
  • Der Dataflow-Dienst erstellt möglicherweise Sicherungskopien Ihres Codes. Probleme können mit manuellen Nebeneffekten auftreten, z. B. wenn Ihr Code auf temporären Dateien mit nicht eindeutigen Namen basiert oder solche Dateien erstellt.
  • Der Dataflow-Dienst serialisiert die Elementverarbeitung pro DoFn-Instanz. Der Code muss nicht zwingend threadsicher sein, aber jeglicher Status, der von mehreren DoFn-Instanzen gemeinsam genutzt wird, muss threadsicher sein.

Bibliotheken wiederverwendbarer Transformationen erstellen

Mit dem Apache Beam-Programmiermodell können Sie Transformationen wiederverwenden. Durch Erstellen einer gemeinsam genutzten Bibliothek mit gemeinsamen Transformationen können Sie die Wiederverwendbarkeit, Testbarkeit und Codeinhaberschaft von verschiedenen Teams verbessern.

Sehen Sie sich dazu die folgenden zwei Java-Codebeispiele an, die beide Zahlungsereignisse lesen. Sofern beide Pipelines dieselbe Verarbeitung ausführen, können sie dieselben Transformationen über eine gemeinsam genutzte Bibliothek für die verbleibenden Verarbeitungsschritte verwenden.

Das erste Beispiel stammt aus einer unbegrenzten Pub/Sub-Quelle:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments =
    p.apply("Read from topic",
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

Das zweite Beispiel stammt aus einer begrenzten relationalen Datenbankquelle:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments =
    p.apply(
        "Read from database table",
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

Wie Sie Best Practices für die Wiederverwendbarkeit von Code umsetzen, variiert je nach Programmiersprache und Build-Tool. Wenn Sie beispielsweise Maven verwenden, lässt sich der Transformationscode in ein eigenes Modul ausgliedern. Anschließend können Sie das Modul als Submodul in größere Projekte mit mehreren Modulen für verschiedene Pipelines einbinden, wie im folgenden Codebeispiel gezeigt:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

Weitere Informationen finden Sie auf den folgenden Apache Beam-Dokumentationsseiten:

Dead-Letter-Warteschlangen für die Fehlerbehandlung verwenden

Manchmal kann Ihre Pipeline keine Elemente verarbeiten. Datenprobleme sind eine häufige Ursache. So kann ein Element mit falsch formatiertem JSON-Code zu Parsing-Fehlern führen.

Sie können zwar Ausnahmen in der Methode DoFn.ProcessElement abfangen, den Fehler protokollieren und das Element entfernen, jedoch gehen bei diesem Ansatz die Daten verloren und es wird verhindert, dass die Daten später zur manuellen Verarbeitung oder Fehlerbehebung geprüft werden,

Verwenden Sie stattdessen ein Muster, das als Dead-Letter-Warteschlange (Warteschlange für unverarbeitete Nachrichten) bezeichnet wird. Fangen Sie Ausnahmen in der Methode DoFn.ProcessElement ab und protokollieren Sie Fehler. Anstatt das fehlgeschlagene Element zu löschen, verwenden Sie verzweigte Ausgaben, um fehlgeschlagene Elemente in ein separates PCollection-Objekt zu schreiben. Diese Elemente werden dann über eine separate Transformation zur späteren Prüfung und Verarbeitung in eine Datensenke geschrieben.

Das folgende Java-Codebeispiel zeigt, wie das Muster für Dead-Letter-Warteschlangen implementiert wird.

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

Mithilfe von Cloud Monitoring lassen sich verschiedene Monitoring- und Benachrichtigungsrichtlinien für die Dead-Letter-Warteschlange der Pipeline anwenden. Sie können beispielsweise die Anzahl und Größe der von Ihrer Dead-Letter-Transformation verarbeiteten Elemente visualisieren und Benachrichtigungen so konfigurieren, dass sie ausgelöst werden, wenn bestimmte Grenzwertbedingungen erfüllt sind.

Schemamutationen verarbeiten

Sie können Daten, die unerwartete (aber gültige) Schemas haben, mithilfe eines Dead-Letter-Musters verarbeiten, das fehlgeschlagene Elemente in ein separates PCollection-Objekt schreibt. In einigen Fällen ist es sinnvoll, Elemente, die ein mutiertes Schema widerspiegeln, automatisch als gültige Elemente zu verarbeiten. Wenn beispielsweise das Schema eines Elements eine Mutation wie die Ergänzung neuer Felder enthält, können Sie das Schema der Datensenke so anpassen, dass Mutationen berücksichtigt werden.

Die automatische Schemamutation basiert auf dem Ansatz für verzweigte Ausgaben, der vom Dead-Letter-Muster verwendet wird. In diesem Fall wird jedoch eine Transformation ausgelöst, die das Zielschema ändert, wenn additive Schemas gefunden werden. Ein Beispiel für diesen Ansatz finden Sie unter How to handle mutating JSON schemas in a streaming pipeline, with Square Enix im Google Cloud-Blog.

Entscheiden, wie Datenbanken verknüpft werden sollen

Dataset-Joins werden bei Datenpipelines häufig angewendet. Sie können Nebeneingaben oder die CoGroupByKey-Transformation verwenden, um Joins in Ihrer Pipeline durchzuführen. Beide Wege haben Vor- und Nachteile.

Nebeneingaben stellen eine flexible Möglichkeit dar, gängige Datenverarbeitungsprobleme wie Datenanreicherung und schlüsselbasierte Lookups zu lösen. Im Gegensatz zu PCollection-Objekten sind Nebeneingaben änderbar und können zur Laufzeit bestimmt werden. So lassen sich die Werte in einer Nebeneingabe von einem anderen Zweig in der Pipeline berechnen oder durch Aufrufen eines Remote-Dienstes bestimmen.

Zur Unterstützung von Nebeneingaben legt Dataflow Daten in einem nichtflüchtigen Speicher (ähnlich einem freigegebenen Laufwerk) dauerhaft ab. Durch diese Konfiguration steht die komplette Nebeneingabe allen Workern zur Verfügung.

Nebeneingaben können jedoch sehr groß sein und passen möglicherweise nicht in den Worker-Arbeitsspeicher. Lesevorgänge aus einer großen Nebeneingabe können zu Leistungsproblemen führen, wenn Worker ständig aus dem nichtflüchtigen Speicher lesen müssen.

Die CoGroupByKey-Transformation ist eine Apache Beam-Kerntransformation, die mehrere PCollection-Objekte zusammenführt (vereinfacht) und Elemente gruppiert, die einen gemeinsamen Schlüssel haben. Im Gegensatz zu einer Nebeneingabe, die jedem Worker alle Daten einer Nebeneingabe zur Verfügung stellt, führt CoGroupByKey einen Shuffle-Vorgang (Gruppierung) aus, um Daten auf Worker zu verteilen. Daher ist CoGroupByKey ideal, wenn die zu verknüpfenden PCollection-Objekte sehr groß sind und nicht in den Worker-Arbeitsspeicher passen.

Orientieren Sie sich an den folgenden Richtlinien, wenn Sie sich für die Verwendung von Nebeneingaben oder CoGroupByKey entscheiden:

  • Verwenden Sie Nebeneingaben, wenn eines der zu verknüpfenden PCollection-Objekte unverhältnismäßig kleiner als die anderen ist und das kleinere PCollection-Objekt in den Worker-Arbeitsspeicher passt. Wenn die gesamte Nebeneingabe im Cache gespeichert wird, können Elemente schnell und effizient abgerufen werden.
  • Verwenden Sie Nebeneingaben, wenn Sie ein PCollection-Objekt haben, das in der Pipeline mehrmals verknüpft werden muss. Statt mehrere CoGroupByKey-Transformationen zu verwenden, erstellen Sie eine einzelne Nebeneingabe, die von mehreren ParDo-Transformationen wiederverwendet werden kann.
  • Verwenden Sie CoGroupByKey, wenn Sie einen großen Teil eines PCollection-Objekts abrufen müssen, das den Worker-Arbeitsspeicher erheblich überschreitet.

Weitere Informationen finden Sie unter Fehlerbehebung bei Dataflow-Fehlern aufgrund von fehlerhaftem Arbeitsspeicher.

Aufwendige Vorgänge pro Element minimieren

Eine DoFn-Instanz verarbeitet Batches von Elementen, die als Bundles bezeichnet werden. Dies sind atomare Arbeitseinheiten, die aus null oder mehr Elementen bestehen. Einzelne Elemente werden dann von der Methode DoFn.ProcessElement verarbeitet, die für jedes Element ausgeführt wird. Da die Methode DoFn.ProcessElement für jedes Element aufgerufen wird, werden alle von dieser Methode aufgerufenen zeit- oder rechenintensiven Vorgänge für jedes einzelne Element ausgeführt, das von der Methode verarbeitet wird.

Wenn Sie aufwendige Vorgänge nur einmal für einen Batch von Elementen ausführen müssen, sollten Sie diese Vorgänge in die Methode DoFn.Setup oder die Methode DoFn.StartBundle statt in das Element DoFn.ProcessElement aufnehmen. Beispiele hierfür sind die folgenden Vorgänge:

  • Parsen einer Konfigurationsdatei, die einige Verhaltensweisen der DoFn-Instanz steuert. Rufen Sie diese Aktion nur einmal auf, wenn die Instanz DoFn über die Methode DoFn.Setup initialisiert wird.

  • Instanziieren eines kurzlebigen Clients, der für alle Elemente in einem Bundle wiederverwendet wird, z. B. wenn alle Elemente im Bundle über eine einzige Netzwerkverbindung gesendet werden. Rufen Sie diese Aktion mit der Methode DoFn.StartBundle einmal pro Bundle auf.

Batchgrößen und gleichzeitige Aufrufe externer Dienste begrenzen

Wenn Sie externe Dienste aufrufen, können Sie die Transformation GroupIntoBatches verwenden, um den Overhead pro Aufruf zu reduzieren. Diese Transformation erstellt Batches von Elementen einer bestimmten Größe. Batching sendet Elemente als eine einzige Nutzlast statt einzeln an einen externen Dienst.

In Kombination mit Batching begrenzen Sie die maximale Anzahl paralleler (gleichzeitiger) Aufrufe des externen Dienstes, indem Sie geeignete Schlüssel zum Partitionieren der eingehenden Daten auswählen. Die Anzahl der Partitionen bestimmt die maximale Parallelisierung. Wenn beispielsweise jedes Element denselben Schlüssel erhält, wird eine nachgelagerte Transformation zum Aufrufen des externen Dienstes nicht parallel ausgeführt.

Sie können einen der folgenden Ansätze verwenden, um Schlüssel für Elemente zu erstellen:

  • Wählen Sie ein Attribut des Datasets aus, das als Datenschlüssel verwendet werden soll, z. B. Nutzer-IDs.
  • Generieren Sie Datenschlüssel, um Elemente nach dem Zufallsprinzip auf eine feste Anzahl von Partitionen aufzuteilen, wobei die Anzahl der möglichen Schlüsselwerte die Anzahl der Partitionen bestimmt. Sie müssen genügend Partitionen für die Parallelisierung erstellen. Jede Partition muss genügend Elemente haben, damit die GroupIntoBatches-Transformation nützlich ist.

Das folgende Java-Codebeispiel zeigt, wie Elemente nach dem Zufallsprinzip auf 10 Partitionen aufgeteilt werden:

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

Leistungsprobleme aufgrund zusammengeführter Schritte identifizieren

Dataflow erstellt eine Grafik der Schritte, die Ihre Pipeline darstellen, auf Basis der Transformationen und Daten, die Sie zum Erstellen der Pipeline verwendet haben. Diese Grafik wird als Ausführungsgrafik der Pipeline bezeichnet.

Wenn Sie die Pipeline bereitstellen, ändert Dataflow möglicherweise die Ausführungsgrafik der Pipeline, um die Leistung zu verbessern. Beispielsweise kann Dataflow einige Vorgänge zusammenführen – ein Prozess, der als Zusammenführungsoptimierung bezeichnet wird. Damit sollen die Leistungs- und Kostenauswirkungen vermieden werden, die mit dem Schreiben eines jeden PCollection-Zwischenobjekts in der Pipeline einhergehen.

In einigen Fällen kann Dataflow eine falsche optimale Zusammenführung von Vorgängen in der Pipeline ermitteln. Dies kann die Fähigkeit Ihres Jobs einschränken, alle verfügbaren Worker zu verwenden. In solchen Fällen können Sie das Zusammenführen von Vorgängen verhindern.

Sehen Sie sich den folgenden Apache Beam-Beispielcode an. Eine GenerateSequence-Transformation erstellt ein kleines begrenztes PCollection-Objekt, das wiederum von zwei nachgelagerten ParDo-Transformationen verarbeitet wird.

Die Find Primes Less-than-N-Transformation kann rechenintensiv sein und wird bei großen Zahlen wahrscheinlich langsam ausgeführt. Im Gegensatz dazu wird die Transformation Increment Number wahrscheinlich schnell abgeschlossen.

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number",
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

Das folgende Diagramm zeigt eine grafische Darstellung der Pipeline in der Dataflow-Monitoring-Oberfläche.

Diagramm: Darstellung des Pipelineflusses in der Dataflow-Oberfläche.

Die Dataflow-Monitoring-Oberfläche zeigt, dass für beide Transformationen dieselbe langsame Verarbeitungsrate auftritt, insbesondere 13 Elemente pro Sekunde. Man könnte erwarten, dass die Increment Number-Transformation Elemente schnell verarbeitet, stattdessen scheint sie aber an die gleiche Verarbeitungsrate wie Find Primes Less-than-N gebunden zu sein.

Dies liegt daran, dass Dataflow die Schritte zu einer einzigen Phase zusammengeführt hat und damit verhindert, dass sie unabhängig voneinander ausgeführt werden. Mit dem Befehl gcloud dataflow jobs describe können Sie weitere Informationen abrufen:

gcloud dataflow jobs describe --full job-id --format json

In der resultierenden Ausgabe werden die zusammengeführten Schritte im Objekt ExecutionStageSummary im Array ComponentTransform beschrieben:

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

Da die Find Primes Less-than-N-Transformation den langsamen Schritt darstellt, ist in diesem Szenario die Teilung der Zusammenführung vor diesem Schritt sinnvoll. Eine Methode zum Aufheben der Zusammenführung von Schritten besteht darin, vor dem Schritt eine GroupByKey-Transformation einzufügen und die Gruppierung aufzuheben, wie im folgenden Java-Codebeispiel gezeigt.

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

Sie können diese Schritte zum Aufheben der Zusammenführung auch zu einer wiederverwendbaren zusammengesetzten Transformation kombinieren.

Nachdem Sie die Zusammenführung der Schritte aufgehoben haben, wird Increment Number beim Ausführen der Pipeline in wenigen Sekunden abgeschlossen und die viel länger laufende Find Primes Less-than-N-Transformation wird in einer separaten Phase ausgeführt.

In diesem Beispiel wird der Vorgang zum Gruppieren und Aufheben der Gruppierung angewendet, um die Zusammenführung von Schritten aufzuheben. In anderen Situationen können Sie andere Ansätze verwenden. Im obigen Fall stellt die Verarbeitung doppelter Ausgaben aufgrund der aufeinanderfolgenden Ausgabe der GenerateSequence-Transformation kein Problem dar. KV-Objekte mit doppelten Schlüsseln werden zu einem einzigen Schlüssel in den Transformationen zum Gruppieren (GroupByKey) und Aufheben der Gruppierung (Keys) dedupliziert. Wenn Sie Duplikate nach den Vorgängen zum Gruppieren und Aufheben der Gruppierung beibehalten möchten, erstellen Sie mithilfe der folgenden Schritte Schlüssel/Wert-Paare:

  1. Verwenden Sie einen zufälligen Schlüssel und die ursprüngliche Eingabe als Wert.
  2. Gruppieren Sie mit dem zufälligen Schlüssel.
  3. Geben Sie die Werte für jeden Schlüssel als Ausgabe aus.

Sie können auch eine Reshuffle-Transformation verwenden, um die Zusammenführung von umgebenden Transformationen zu verhindern. Allerdings sind die Nebeneffekte der Reshuffle-Transformation nicht auf verschiedene Apache Beam-Runner übertragbar.

Weitere Informationen zur Parallelität und Fusionsoptimierung finden Sie unter Pipelinelebenszyklus.

Pipeline-Statistiken mithilfe von Apache Beam-Messwerten erfassen

Apache Beam-Messwerte sind eine Dienstprogrammklasse, die Messwerte zum Melden der Attribute einer ausgeführten Pipeline erstellt. Wenn Sie Cloud Monitoring verwenden, stehen Apache Beam-Messwerte als benutzerdefinierte Cloud Monitoring-Messwerte zur Verfügung.

Das folgende Beispiel zeigt Counter-Messwerte von Apache Beam, die in einer abgeleiteten Klasse DoFn verwendet werden.

Im Beispielcode werden zwei Zähler verwendet. Der eine Zähler verfolgt JSON-Parsing-Fehler (malformedCounter) und der andere Zähler verfolgt, ob die JSON-Nachricht gültig ist, aber eine leere Nutzlast enthält (emptyCounter). In Cloud Monitoring lauten die Namen der benutzerdefinierten Messwerte custom.googleapis.com/dataflow/malformedJson und custom.googleapis.com/dataflow/emptyPayload. Sie können die benutzerdefinierten Messwerte verwenden, um Visualisierungen und Benachrichtigungsrichtlinien in Cloud Monitoring zu erstellen.

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

Weitere Informationen

Auf den folgenden Seiten erfahren Sie mehr darüber, wie Sie Ihre Pipeline strukturieren, wie Sie die Transformationen auswählen, die auf Ihre Daten angewendet werden sollen, und was Sie bei der Auswahl der Eingabe- und Ausgabemethoden Ihrer Pipeline berücksichtigen sollten.

Weitere Informationen zum Erstellen von Nutzercode finden Sie unter Anforderungen für vom Nutzer bereitgestellte Funktionen.