Dataflow-Pipelines entwickeln und testen

Diese Seite enthält Best Practices zum Entwickeln und Testen Ihrer Dataflow-Pipeline.

Überblick

Wie gut Ihre Pipeline in der Produktion funktioniert, hängt stark davon ab, wie Sie den Code für die Pipeline implementieren. Damit Sie korrekt und effizient funktionierenden Pipelinecode erstellen können, wird in diesem Dokument Folgendes erläutert:

  • Pipeline-Runner zur Unterstützung der Codeausführung in den verschiedenen Phasen der Entwicklung und Bereitstellung.
  • Bereitstellungsumgebungen, mit denen Sie Pipelines während der Entwicklungs-, Test-, Vorproduktions- und Produktionsphase ausführen können.
  • Open-Source-Pipelinecode und -Vorlagen, die Sie unverändert oder als Grundlage für neue Pipelines verwenden können, um die Codeentwicklung zu beschleunigen.
  • Ein Best-Practices-Ansatz zum Testen von Pipelinecode. Dieses Dokument gibt zuerst einen Überblick über den Umfang und die Beziehung verschiedener Arten von Tests wie Unit-, Integrations- und End-to-End-Tests. Dann wird jede Art von Test ausführlich erläutert. Unter anderem lernen Sie Methoden zum Erstellen und Einbinden von Testdaten kennen und erfahren, welche Pipeline-Runner für die einzelnen Tests zu verwenden sind.

Pipeline-Runner

In der Entwicklungs- und Testphase verwenden Sie verschiedene Apache Beam-Runner zum Ausführen von Pipelinecode. Das Apache Beam SDK bietet einen Direct Runner für die lokale Entwicklung und Tests. Ihre Tools zur Releaseautomatisierung können auch den Direct Runner für Unit- und Integrationstests verwenden. Sie können beispielsweise den Direct Runner in der Continuous-Integration-Pipeline (CI) verwenden.

Pipelines, die in Dataflow bereitgestellt werden, verwenden den Dataflow-Runner, der die Pipeline in produktionsähnlichen Umgebungen ausführt. Darüber hinaus können Sie den Dataflow Runner für Ad-hoc-Entwicklungstests und End-to-End-Pipelinetests verwenden.

Obwohl sich diese Seite auf die Ausführung von Pipelines konzentriert, die mit dem Apache Beam Java SDK erstellt wurden, unterstützt Dataflow auch Apache Beam-Pipelines, die mit Python und Go entwickelt wurden. Die Apache Beam Java, Python und Go SDKs sind für Dataflow allgemein verfügbar. SQL-Entwickler können auch Apache Beam SQL verwenden, um Pipelines zu erstellen, die vertraute SQL-Dialekte verwenden.

Bereitstellungsumgebung einrichten

Erstellen Sie Bereitstellungsumgebungen, um Nutzer, Daten, Code und andere Ressourcen in verschiedenen Entwicklungsphasen voneinander zu trennen. Wenn möglich, sollten Sie separate Google Cloud-Projekte verwenden, um isolierte Umgebungen für die verschiedenen Phasen der Pipelineentwicklung bereitzustellen.

In den folgenden Abschnitten wird ein üblicher Satz an Bereitstellungsumgebungen beschrieben.

Lokale Umgebung

Die lokale Umgebung ist die Workstation eines Entwicklers. Verwenden Sie für Entwicklung und schnelle Tests den Direct Runner, um Pipeline-Code lokal auszuführen.

Pipelines, die lokal mit dem Direct Runner ausgeführt werden, können mit Google Cloud-Remote-Ressourcen wie Pub/Sub-Themen oder BigQuery-Tabellen interagieren. Weisen Sie Entwicklern jeweils eigene Google Cloud-Projekte zu, damit sie eine Sandbox für Ad-hoc-Tests mit Google Cloud-Diensten haben.

Einige Google Cloud-Dienste wie Pub/Sub und Bigtable bieten Emulatoren für die lokale Entwicklung. Sie können diese Emulatoren mit dem Direct Runner verwenden, um die lokale End-to-End-Entwicklung und lokale Tests zu ermöglichen.

Sandbox-Umgebung

Die Sandbox-Umgebung ist ein Google Cloud-Projekt, das Entwicklern während der Codeentwicklung Zugriff auf Google Cloud-Dienste gibt. Pipelineentwickler können ein Google Cloud-Projekt zusammen mit anderen Entwicklern nutzen oder ihre eigenen Projekte verwenden. Die Verwendung eigener Projekte vereinfacht die Planung im Hinblick auf die gemeinsame Nutzung von Ressourcen und Kontingentverwaltung.

Entwickler verwenden die Sandbox-Umgebung für Ad-hoc-Pipelineausführungen mit dem Dataflow Runner. Die Sandbox-Umgebung ist nützlich, um während der Codeentwicklungsphase Code mit einem Produktions-Runner zu debuggen und zu testen. Bei der Ad-hoc-Pipelineausführung können Entwickler beispielsweise:

  • die Auswirkungen von Codeänderungen auf das Skalierungsverhalten beobachten,
  • potenzielle Unterschiede zwischen dem Verhalten des Direct Runners und des Dataflow-Runners erkennen und
  • nachvollziehen, wie Dataflow Grafikoptimierungen anwendet.

Für Ad-hoc-Tests können Entwickler Code aus ihrer lokalen Umgebung bereitstellen, um Dataflow in ihrer Sandbox-Umgebung auszuführen.

Vorproduktionsumgebung

Die Vorproduktionsumgebung ist für Entwicklungsphasen vorgesehen, die unter produktionsähnlichen Bedingungen erfolgen müssen, beispielsweise End-to-End-Tests. Verwenden Sie ein separates Projekt für die Vorproduktionsumgebung und konfigurieren Sie es so, dass es nach Möglichkeit der Produktionsumgebung gleicht. Ähnliches gilt für End-to-End-Tests: Damit diese mit produktionsähnlichem Umfang ausgeführt werden können, sollten Sie Google Cloud-Projektkontingente für Dataflow und andere Dienste so dimensionieren, dass sie der Produktionsumgebung möglichst ähnlich sind.

Je nach Ihren Anforderungen können Sie die Vorproduktion weiter in mehrere Umgebungen unterteilen. Beispielsweise kann eine Qualitätskontrollumgebung die Arbeit von Qualitätsanalysten unterstützen, um Service Level Objectives (SLOs) wie Datenrichtigkeit, -aktualität und -leistung unter verschiedenen Arbeitslastbedingungen zu testen.

Bei End-to-End-Tests werden unter anderem Datenquellen und -senken im Rahmen der Test eingebunden. Überlegen Sie sich, wie Sie diese in der Vorproduktionsumgebung zur Verfügung stellen können. Sie können Testdaten direkt in der Vorproduktionsumgebung speichern. Zum Beispiel werden Testdaten in einem Cloud Storage-Bucket mit Ihren Eingabedaten gespeichert. In anderen Fällen können die Testdaten aus einer Quelle außerhalb der Vorproduktionsumgebung stammen, z. B. aus einem Pub/Sub-Thema über ein separates Abo, das sich in der Produktionsumgebung befindet. Für Streamingpipelines können Sie auch End-to-End-Tests mit generierten Daten ausführen, z. B. mit dem Dataflow Streaming Data Generator, um produktionsähnliche Dateneigenschaften und Umfänge zu emulieren.

Verwenden Sie für Streamingpipelines die Vorproduktionsumgebung, um Pipelineupdates zu testen, bevor Änderungen an der Produktion vorgenommen werden. Es ist wichtig, Updateverfahren für Streamingpipelines zu testen und zu prüfen, insbesondere wenn Sie mehrere Schritte koordinieren müssen, z. B. wenn parallele Pipelines ausgeführt werden, um Ausfallzeiten zu vermeiden.

Produktionsumgebung

Die Produktionsumgebung ist ein dediziertes Google Cloud-Projekt. Bei der Continuous Delivery werden Bereitstellungsartefakte in die Produktionsumgebung kopiert, wenn alle End-to-End-Tests bestanden wurden.

Best Practices für die Entwicklung

Siehe Best Practices für Dataflow-Pipelines.

Eigene Pipeline testen

Bei der Softwareentwicklung sind Unit-, Integrations- und End-to-End-Tests gängige Methoden zum Testen von Software. Diese Testmethoden gelten auch für Datenpipelines.

Das Apache Beam SDK bietet Funktionen zum Aktivieren dieser Tests. Idealerweise ist jeder Testtyp auf eine andere Bereitstellungsumgebung ausgerichtet. Das folgende Diagramm veranschaulicht, wie Unit-, Integrations- und End-to-End-Tests für verschiedene Teile der Pipeline und Daten angewendet werden.

Diagramm: Arten von Tests und ihre Beziehung zu Transformationen, Pipelines, Datenquellen und Datensenken.

Das Diagramm zeigt den Umfang verschiedener Tests und deren Beziehung zu Transformationen (von DoFn und PTransform abgeleiteten Klassen), Pipelines, Datenquellen und Datensenken.

In den folgenden Abschnitten wird beschrieben, wie verschiedene formale Softwaretests mit Dataflow auf Datenpipelines angewendet werden. Greifen Sie bei der Lektüre dieses Abschnitts auf dieses Diagramm zurück, um nachvollziehen zu können, wie die verschiedenen Arten von Tests zusammenhängen.

Stichprobenerhebung

Aktivieren Sie während des Tests die Stichprobenerhebung, um die Daten bei jedem Schritt einer Dataflow-Pipeline zu beobachten. Auf diese Weise können Sie sich die Ausgaben der Transformationen ansehen und so prüfen, ob die Ausgabe korrekt ist.

Unittests

Unittests bewerten die ordnungsgemäße Funktion von abgeleiteten Klassen von DoFn und von zusammengesetzten Transformationen (von PTransform abgeleitete Klassen). Dazu wird die Ausgabe dieser Transformationen mit einem verifizierten Satz von Dateneingaben und -ausgaben verglichen. In der Regel können Entwickler diese Tests in der lokalen Umgebung ausführen. Die Tests lassen sich auch automatisch über die Unittest-Automatisierung mithilfe von Continuous Integration (CI) in der Build-Umgebung ausführen.

Der Direct Runner führt Unittests mit einer kleineren Teilmenge von Referenztestdaten aus. Der Schwerpunkt liegt dabei auf dem Testen der Geschäftslogik Ihrer Transformationen. Die Testdaten müssen klein genug sein, um in den lokalen Arbeitsspeicher des Computers zu passen, auf dem der Test ausgeführt wird.

Das Apache Beam SDK bietet eine JUnit-Regel namens TestPipeline für die Ausführung von Unittests für einzelne Transformationen (von DoFn abgeleitete Klassen), zusammengesetzte Transformationen (von PTransform abgeleitete Klassen) und ganze Pipelines. Sie können TestPipeline in einem Apache Beam-Pipeline-Runner wie dem Direct Runner oder dem Dataflow Runner verwenden, um Assertions auf den Inhalt von PCollection-Objekten anzuwenden. Dazu verwenden Sie PAssert, wie im folgenden Code-Snippet einer JUnit-Testklasse gezeigt:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

Unittests für einzelne Transformationen

Durch die Faktorisierung des Codes in wiederverwendbare Transformationen (z. B. als Top-Level- oder statische verschachtelte Klassen) können Sie zielgerichtete Tests für verschiedene Teile der Pipeline erstellen. Neben den Vorteilen von Tests verbessern wiederverwendbare Transformationen die Verwaltbarkeit und Wiederverwendbarkeit von Code, da die Geschäftslogik der Pipeline von Natur aus in Komponenten gekapselt wird. Allerdings kann das Testen einzelner Teile der Pipeline schwierig sein, wenn die Pipeline anonyme innere Klassen zum Implementieren von Transformationen verwendet.

Das folgende Java-Snippet zeigt die Implementierung von Transformationen als anonyme innere Klassen, was die Ausführung von Tests erschwert.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

Vergleichen Sie das vorherige Beispiel mit dem folgenden Beispiel, bei dem die anonymen inneren Klassen in benannte konkrete abgeleitete Klassen von DoFn refaktoriert werden. Sie können für jede konkrete abgeleitete Klasse von DoFn, aus denen die End-to-End-Pipeline besteht, einzelne Unittests erstellen.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

Das Testen jeder abgeleiteten Klasse von DoFn ähnelt einem Unittest für eine Batchpipeline, die eine einzelne Transformation enthält. Erstellen Sie mit der Transformation Create ein PCollection-Objekt mit Testdaten und übergeben Sie es an das Objekt DoFn. Bestätigen Sie mit PAssert, dass der Inhalt des PCollection-Objekts korrekt ist. Im folgenden Java-Codebeispiel wird die Klasse PAssert verwendet, um das Ausgabeformat auf Richtigkeit zu prüfen.

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

Integrationstests

Integrationstests prüfen, ob Ihre gesamte Pipeline ordnungsgemäß funktioniert. Betrachten Sie die folgenden Arten von Integrationstests:

  • Einen Transformationsintegrationstest, der die integrierte Funktionalität der einzelnen Transformationen bewertet, aus denen die Datenpipeline besteht. Sie können sich Transformationsintegrationstests als Unittest für die gesamte Pipeline vorstellen, ausgenommen die Einbindung in externe Datenquellen und -senken. Das Apache Beam SDK bietet Methoden zum Bereitstellen von Testdaten für die Datenpipeline und zum Prüfen der Verarbeitungsergebnisse. Transformationsintegrationstests werden mit dem Direct Runner ausgeführt.
  • Einen Systemintegrationstest, der die Einbindung der Datenpipeline in Live-Datenquellen und -senken bewertet. Damit die Pipeline mit externen Systemen kommunizieren kann, müssen Sie die Tests mit entsprechenden Anmeldedaten für den Zugriff auf externe Dienste konfigurieren. Da Streamingpipelines unbegrenzt ausgeführt werden, müssen Sie entscheiden, wann und wie die laufende Pipeline beendet werden soll. Wenn Sie Systemintegrationstests mit dem Direct Runner ausführen, können Sie die Einbindung der Pipeline in andere Systeme schnell prüfen, ohne einen Dataflow-Job senden und auf dessen Abschluss warten zu müssen.

Entwickeln Sie Tests für Transformation und Systemintegration so, dass Fehlererkennung und Feedback schnell erfolgen, ohne die Entwicklerproduktivität zu verlangsamen. Für Tests mit längerer Ausführungszeit, die beispielsweise als Dataflow-Jobs ausgeführt werden, können Sie einen End-to-End-Test verwenden, der weniger häufig ausgeführt wird.

Sie können sich eine Datenpipeline als eine oder mehrere zusammengehörige Transformationen vorstellen. Sie können eine kapselnde zusammengesetzte Transformation für die Pipeline erstellen und mit TestPipeline einen Integrationstest für die gesamte Pipeline ausführen. Je nachdem, ob Sie die Pipeline im Batch- oder Streamingmodus testen möchten, stellen Sie Testdaten entweder mit der Transformation Create oder der Transformation TestStream bereit.

Testdaten für Integrationstests verwenden

In der Produktionsumgebung ist Ihre Pipeline wahrscheinlich in verschiedene Datenquellen und -senken eingebunden. Bei Unit- und Transformationsintegrationstests sollte jedoch die Prüfung der Geschäftslogik des Pipelinecodes im Mittelpunkt stehen. Zu diesem Zweck müssen Sie Testeingaben bereitstellen und die Ausgabe direkt prüfen. Mit diesem Ansatz können Sie nicht nur die Tests vereinfachen, sondern auch pipelinespezifische Probleme von solchen isolieren, die durch Datenquellen und -senken verursacht werden.

Batchpipelines testen

Bei Batchpipelines erstellen Sie mit der Create-Transformation ein PCollection-Objekt der Eingabetestdaten aus einer arbeitsspeicherinternen Standardsammlung wie einem List-Java-Objekt. Die Verwendung der Create-Transformation ist geeignet, wenn die Testdaten klein genug sind, um sie in Code einzuschließen. Sie können dann PAssert für die PCollection-Ausgabeobjekte verwenden, um die Richtigkeit des Pipelinecodes zu bestimmen. Dieser Ansatz wird vom Direct Runner und vom Dataflow-Runner unterstützt.

Das folgende Java-Code-Snippet zeigt Assertions für PCollection-Ausgabeobjekte aus einer zusammengesetzten Transformation, die einige oder alle Transformationen enthält, die eine Pipeline (WeatherStatsPipeline) bilden. Der Ansatz entspricht ungefähr dem Testen einzelner Transformationen einer Pipeline mithilfe von Unittests.

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

Zum Testen des Windowing-Verhaltens können Sie auch die Create-Transformation verwenden, um Elemente mit Zeitstempeln zu erstellen, wie im folgenden Code-Snippet gezeigt:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

Streamingpipelines testen

Streamingpipelines enthalten Annahmen, die festlegen, wie unbegrenzte Daten verarbeitet werden sollen. Diese Annahmen beziehen sich häufig auf die Aktualität von Daten unter realen Bedingungen und wirken sich daher auf die Richtigkeit der Pipeline aus, je nachdem, ob die Annahmen wahr oder falsch sind. Integrationstests für Streamingpipelines umfassen idealerweise Tests, die die nicht deterministische Natur des Streamingdateneingangs simulieren.

Zum Ermöglichen solcher Tests bietet das Apache Beam SDK die Klasse TestStream, um die Auswirkungen des Timings von Elementen (frühzeitige, pünktliche oder verspätete Daten) auf die Ergebnisse der Datenpipeline zu modellieren. Verwenden Sie diese Tests in Verbindung mit der Klasse PAssert, um das Ergebnis anhand der erwarteten Ergebnisse zu prüfen.

TestStream wird vom Direct Runner und vom Dataflow-Runner unterstützt. Im folgenden Codebeispiel wird eine TestStream-Transformation erstellt:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

Weitere Informationen zu TestStream finden Sie im Blog Testing Unbounded Pipelines in Apache Beam. Weitere Informationen zur Verwendung des Apache Beam SDK für Unittests finden Sie in der Apache Beam-Dokumentation.

Google Cloud-Dienste in Integrationstests verwenden

Der Direct Runner kann in Google Cloud-Dienste eingebunden werden, sodass Ad-hoc-Tests in der lokalen Umgebung sowie Systemintegrationstests nach Bedarf Pub/Sub, BigQuery und andere Dienste verwenden können. Bei Verwendung des Direct Runners wird die Pipeline entweder mit dem Nutzerkonto ausgeführt, das Sie mit dem gcloud-Befehlszeilentool konfiguriert haben, oder mit dem Dienstkonto, das Sie mit der Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS angegeben haben. Daher müssen Sie diesem Konto ausreichende Berechtigungen für alle erforderlichen Ressourcen erteilen, bevor Sie die Pipeline ausführen. Weitere Informationen finden Sie unter Sicherheit und Berechtigungen in Dataflow.

Bei komplett lokalen Integrationstests können Sie lokale Emulatoren für einige Google Cloud-Dienste verwenden. Lokale Emulatoren sind für Pub/Sub und Bigtable verfügbar.

Für die Ausführung von Systemintegrationstests für Streamingpipelines können Sie die Methode setBlockOnRun verwenden, die in der Schnittstelle DirectOptions definiert ist und bewirkt, dass der Direct Runner die Pipeline asynchron ausführt. Andernfalls blockiert die Pipelineausführung den aufrufenden übergeordneten Prozess (z. B. ein Skript in der Build-Pipeline), bis die Pipeline manuell beendet wird. Wenn Sie die Pipeline asynchron ausführen, können Sie mit der zurückgegebenen PipelineResult-Instanz die Ausführung der Pipeline abbrechen, wie im folgenden Codebeispiel gezeigt:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

End-to-End-Tests

End-to-End-Tests prüfen, ob die End-to-End-Pipeline korrekt funktioniert. Dazu wird sie im Dataflow Runner unter Bedingungen ausgeführt, die der Produktion sehr ähnlich sind. Die Tests prüfen, ob die Geschäftslogik mit dem Dataflow Runner ordnungsgemäß funktioniert und ob die Pipeline unter produktionsähnlichen Lasten die erwartete Leistung bringt. In der Regel werden End-to-End-Tests in einem dedizierten Google Cloud-Projekt ausgeführt, das als Vorproduktionsumgebung festgelegt ist.

Um Ihre Pipeline in verschiedenem Umfang zu testen, verwenden Sie verschiedene Arten von End-to-End-Tests, zum Beispiel:

  • Führen Sie kleine End-to-End-Tests mit einem geringen Teil (z. B. 1 %) des Test-Datasets aus, um die Pipelinefunktionalität in der Vorproduktionsumgebung schnell zu validieren.
  • Führen Sie umfassende End-to-End-Tests mit einem vollständigen Test-Dataset aus, um die Pipelinefunktionalität unter produktionsähnlichen Datenvolumen und Bedingungen zu validieren.

Für Streamingpipelines empfehlen wir Ihnen, Testpipelines parallel zur Produktionspipeline auszuführen, wenn sie dieselben Daten verwenden können. Damit haben Sie die Möglichkeit, Ergebnisse und operatives Verhalten wie Autoscaling und Leistung miteinander zu vergleichen.

Mithilfe von End-to-End-Tests können Sie vorhersagen, wie gut die Pipeline Ihre Produktions-SLOs erfüllt. Die Vorproduktionsumgebung testet Ihre Pipeline unter produktionsähnlichen Bedingungen. Im Rahmen von End-to-End-Tests werden Pipelines mit dem Dataflow Runner ausgeführt, um vollständige Referenz-Datasets zu verarbeiten, die Produktions-Datasets entsprechen oder stark ähneln.

Unter Umständen ist es nicht möglich, synthetische Daten für Tests zu generieren, die reale Daten genau simulieren. Ein Ansatz zur Lösung dieses Problems besteht darin, mithilfe von bereinigten Extrakten aus Produktionsdatenquellen Referenz-Datasets zu erstellen, in denen alle sensiblen Daten über entsprechende Transformationen de-identifiziert werden. Für diesen Zweck empfehlen wir die Verwendung des Schutzes sensibler Daten. Der Schutz sensibler Daten kann sensible Daten in einer Reihe von Inhaltstypen und Datenquellen erkennen und verschiedene De-Identifikationstechniken anwenden, darunter Datenentfernung, Maskierung, formaterhaltende Verschlüsselung und Datumsverschiebung.

Unterschiede bei End-to-End-Tests für Batch- und Streamingpipelines

Bevor Sie einen vollständigen End-to-End-Test mit einem großen Test-Datasets ausführen, empfiehlt es sich, einen Test mit einem kleineren Prozentsatz wie 1 % der Testdaten auszuführen. So können Sie das erwartete Verhalten in kürzerer Zeit prüfen. Wie bei Integrationstests mit dem Direct Runner können Sie PAssert für PCollection-Objekte verwenden, wenn Sie Pipelines mit dem Dataflow Runner ausführen. Weitere Informationen zu PAssert finden Sie im Abschnitt Unittests auf dieser Seite.

Je nach Anwendungsfall kann die Prüfung einer sehr großen Ausgabe von End-to-End-Tests unmöglich, aufwendig oder anderweitig schwierig sein. In diesem Fall haben Sie die Möglichkeit, stattdessen repräsentative Stichproben aus der Ergebnismenge der Ausgabe zu prüfen. Beispielsweise können Sie mit BigQuery Stichproben von Ausgabezeilen erstellen und mit einem Referenz-Dataset von erwarteten Ergebnissen vergleichen.

Bei Streamingpipelines kann sich die Simulation realistischer Streamingbedingungen mit synthetischen Daten als schwierig erweisen. Eine gängige Methode zum Bereitstellen von Streamingdaten für End-to-End-Tests besteht darin, Tests in Produktionsdatenquellen einzubinden. Wenn Sie Pub/Sub als Datenquelle verwenden, können Sie einen separaten Datenstrom für End-to-End-Tests über zusätzliche Abos vorhandener Themen aktivieren. Anschließend lassen sich die Ergebnisse verschiedener Pipelines, die dieselben Daten verwenden, miteinander vergleichen. Dies ist nützlich, um Kandidatenpipelines anhand von anderen Vorproduktions- und Produktionspipelines zu prüfen.

Das folgende Diagramm zeigt, wie diese Methode die parallele Ausführung einer Produktionspipeline und einer Testpipeline in verschiedenen Bereitstellungsumgebungen ermöglicht.

Diagramm: Parallele Ausführung einer Testpipeline und einer Produktionspipeline mithilfe einer einzigen Pub/Sub-Streamingquelle.

Im Diagramm lesen beide Pipelines aus demselben Pub/Sub-Thema, verwenden jedoch getrennte Abos. So können die beiden Pipelines dieselben Daten unabhängig voneinander verarbeiten und die Ergebnisse verglichen werden. Die Testpipeline verwendet ein eigenes Dienstkonto aus dem Produktionsprojekt und beansprucht daher kein Pub/Sub-Abonnentenkontingent für das Produktionsprojekt.

Im Gegensatz zu Batchpipelines werden Streamingpipelines so lange ausgeführt, bis sie explizit abgebrochen werden. Bei End-to-End-Tests müssen Sie entscheiden, ob die Pipeline weiter ausgeführt wird, etwa bis zum nächsten End-to-End-Test, oder an einem Punkt abgebrochen wird, der den Testabschluss darstellt, damit Sie die Ergebnisse überprüfen können.

Die Art der verwendeten Testdaten beeinflusst diese Entscheidung. Wenn Sie beispielsweise einen begrenzten Satz von Testdaten für die Streamingpipeline verwenden, können Sie die Pipeline abbrechen, wenn alle Elemente verarbeitet wurden. Wenn Sie dagegen eine echte Datenquelle wie ein vorhandenes, in der Produktion verwendetes Pub/Sub-Thema nutzen oder anderweitig kontinuierlich Testdaten generieren, möchten Sie Testpipelines vielleicht über einen längeren Zeitraum ausführen. Letzteres ermöglicht Ihnen, das Verhalten mit der Produktionsumgebung oder auch mit anderen Testpipelines zu vergleichen.