Diese Seite enthält Anleitungen und Empfehlungen zum Aktualisieren Ihrer Streamingpipelines. Beispielsweise müssen Sie möglicherweise ein Upgrade auf eine neuere Version des Apache Beam SDK ausführen oder Ihren Pipelinecode aktualisieren. Für unterschiedliche Szenarien sind verschiedene Optionen verfügbar.
Im Unterschied zu Batchpipelines, die nach Abschluss des Jobs beendet werden, müssen Streamingpipelines häufig kontinuierlich ausgeführt werden, um eine unterbrechungsfreie Verarbeitung zu ermöglichen. Daher müssen Sie beim Upgrade von Streamingpipelines die folgenden Punkte berücksichtigen:
- Möglicherweise müssen Sie Unterbrechungen der Pipeline minimieren oder vermeiden. In einigen Fällen können Sie eine vorübergehende Unterbrechung der Verarbeitung tolerieren, während eine neue Version einer Pipeline bereitgestellt wird. In anderen Fällen kann Ihre Anwendung Unterbrechungen nicht tolerieren.
- Pipeline-Aktualisierungsprozesse müssen Schemaänderungen so handhaben, dass Unterbrechungen der Nachrichtenverarbeitung und anderer damit verbundener Systeme minimiert werden. Wenn sich beispielsweise das Schema für Nachrichten in einer Pipeline zur Ereignisverarbeitung ändert, sind möglicherweise auch Schemaänderungen in nachgelagerten Datensenken erforderlich.
Je nach Pipeline- und Aktualisierungsanforderungen können Sie Streamingpipelines mit einer der folgenden Methoden aktualisieren:
Weitere Informationen zu Problemen, die während einer Aktualisierung auftreten können, und wie Sie diese verhindern können, finden Sie unter Ersatzjob validieren und Jobkompatibilitätsprüfung.
Best Practices
- Aktualisieren Sie die Apache Beam SDK-Version getrennt von Pipelinecodeänderungen.
- Testen Sie Ihre Pipeline nach jeder Änderung, bevor Sie zusätzliche Aktualisierungen vornehmen.
- Aktualisieren Sie regelmäßig die von Ihrer Pipeline verwendete Apache Beam SDK-Version.
Aktualisierungen im laufenden Betrieb durchführen
Sie können einige laufende Streamingpipelines aktualisieren, ohne den Job zu beenden. Dieses Szenario wird als Jobaktualisierung im laufenden betrieb bezeichnet. Aktualisierungen von laufenden Jobs sind nur unter bestimmten Umständen verfügbar:
- Der Job muss Streaming Engine verwenden.
- Der Job muss den Status „Wird ausgeführt“ haben.
- Sie ändern nur die Anzahl der Worker, die der Job verwendet.
Weitere Informationen finden Sie auf der Seite „Horizontales Autoscaling“ unter Autoscaling-Bereich festlegen.
Eine Anleitung zum Aktualisieren eines laufenden Jobs finden Sie unter Vorhandene Pipeline aktualisieren.
Ersatzjob starten
Wenn der aktualisierte Job mit dem vorhandenen Job kompatibel ist, können Sie Ihre Pipeline mit der Option update
aktualisieren. Wenn Sie einen vorhandenen Job ersetzen, führt ein neuer Job den aktualisierten Pipeline-Code aus.
Der Dataflow-Dienst behält den Jobnamen bei, führt den Ersatzjob aber mit einer aktualisierten Job-ID aus. Dieser Vorgang kann zu Ausfallzeiten führen, während der vorhandene Job angehalten wird, die Kompatibilitätsprüfung ausgeführt wird und der neue Job gestartet wird. Weitere Informationen finden Sie unter Auswirkungen des Ersetzens eines Jobs.
Dataflow führt eine Kompatibilitätsprüfung durch, um sicherzustellen, dass der aktualisierte Pipelinecode sicher auf der ausgeführten Pipeline bereitgestellt werden kann. Bestimmte Codeänderungen führen dazu, dass die Kompatibilitätsprüfung fehlschlägt, z. B. wenn Nebeneingaben einem vorhandenen Schritt hinzugefügt oder daraus entfernt werden. Wenn die Kompatibilitätsprüfung fehlschlägt, können Sie keine direkte Jobaktualisierung durchführen.
Eine Anleitung zum Starten eines Ersatzjobs finden Sie unter Ersatzjob starten.
Wenn die Pipelineaktualisierung nicht mit dem aktuellen Job kompatibel ist, müssen Sie die Pipeline beenden und ersetzen. Wenn Ihre Pipeline keine Ausfallzeiten tolerieren kann, führen Sie parallele Pipelines aus.
Pipelines stoppen und ersetzen
Wenn Sie die Verarbeitung vorübergehend anhalten können, können Sie die Pipeline abbrechen oder per Drain beenden und dann durch die aktualisierte Pipeline ersetzen. Das Abbrechen einer Pipeline führt dazu, dass Dataflow die Verarbeitung sofort stoppt und Ressourcen schnellstmöglich herunterfährt. Dies kann zu einem gewissen Verlust an gerade verarbeitet werdenden Daten führen. Diese werden als In-Flight-Daten bezeichnet. Zum Vermeiden von Datenverlusten ist in den meisten Fällen das Leeren von Daten die bevorzugte Aktion. Sie können Dataflow-Snapshots auch verwenden, um den Status einer Streamingpipeline zu speichern. So können Sie eine neue Version Ihres Dataflow-Jobs starten, ohne den Status zu verlieren. Weitere Informationen finden Sie unter Dataflow-Snapshots verwenden.
Durch das Leeren der Pipeline werden alle laufenden Fenster sofort geschlossen und alle Trigger ausgelöst. Obwohl "In-Flight-Daten" nicht verloren gehen, kann das Leeren der Pipeline dazu führen, dass die Daten in Fenstern unvollständig sind. In diesem Fall geben Prozessfenster nur teilweise oder unvollständige Ergebnisse aus. Weitere Informationen finden Sie unter Auswirkungen des Leerens eines Jobs. Nachdem der vorhandene Job abgeschlossen ist, können Sie einen neuen Streamingjob starten, der den aktualisierten Pipelinecode enthält. Dadurch kann die Verarbeitung fortgesetzt werden.
Bei dieser Methode kommt es zu einer Ausfallzeit zwischen dem Zeitpunkt, an dem der bestehende Streamingjob beendet wird, und dem Zeitpunkt, an dem die Ersatzpipeline zur Wiederaufnahme der Datenverarbeitung bereit ist. Das Abbrechen oder Beenden einer vorhandenen Pipeline und das anschließende Starten eines neuen Jobs mit der aktualisierten Pipeline ist jedoch weniger kompliziert als das Ausführen paralleler Pipelines.
Eine ausführlichere Anleitung finden Sie unter Dataflow-Job per Drain beenden. Nachdem Sie den aktuellen Job per Drain beendet haben, starten Sie einen neuen Job mit demselben Jobnamen.
Neuverarbeitung von Nachrichten mit Pub/Sub-Snapshot und Seek
In einigen Situationen müssen Sie nach dem Ersetzen oder Abbrechen einer geleerten Pipeline möglicherweise zuvor übermittelte Pub/Sub-Nachrichten noch einmal verarbeiten. Es kann beispielsweise sein, dass Sie die aktualisierte Geschäftslogik verwenden müssen, um Daten noch einmal zu verarbeiten. Pub/Sub Seek ist ein Feature, mit dem Sie Nachrichten aus einem Pub/Sub-Snapshot wiedergeben können. Sie können Pub/Sub Seek mit Dataflow verwenden, um Nachrichten ab dem Zeitpunkt der Erstellung des Abo-Snapshots noch einmal zu verarbeiten.
Während der Entwicklung und des Tests können Sie Pub/Sub Seek auch zum wiederholten Abspielen der bekannten Mitteilungen verwenden, um die Ausgabe Ihrer Pipeline zu kontrollieren. Wenn Sie Pub/Sub Seek verwenden, suchen Sie nicht nach einem Abo-Snapshot, wenn das Abo von einer Pipeline genutzt wird. Wenn Sie dies tun, kann das Seek die Wasserzeichenlogik von Dataflow ungültig machen und die genau einmalige Verarbeitung von Pub/Sub-Nachrichten beeinflussen.
Ein empfohlener gcloud CLI-Workflow für die Verwendung von Pub/Sub Seek mit Dataflow-Pipelines in einem Terminalfenster lautet so:
Verwenden Sie den Befehl
gcloud pubsub snapshots create
, um einen Snapshot des Abos zu erstellen:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Wenn Sie die Pipeline per Drain beenden oder abbrechen möchten, verwenden Sie den Befehl
gcloud dataflow jobs drain
oder den Befehlgcloud dataflow jobs cancel
:gcloud dataflow jobs drain JOB_ID
oder
gcloud dataflow jobs cancel JOB_ID
Verwenden Sie den Befehl
gcloud pubsub subscriptions seek
, um den Snapshot zu suchen:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Stellen Sie eine neue Pipeline bereit, die das Abo nutzt.
Parallele Pipelines ausführen
Wenn Sie während einer Aktualisierung eine Unterbrechung der Streamingpipeline vermeiden müssen, führen Sie parallele Pipelines aus. Erstellen Sie einen neuen Streamingjob mit dem aktualisierten Pipelinecode und führen Sie die neue Pipeline parallel zur vorhandenen Pipeline aus.
Verwenden Sie beim Erstellen der neuen Pipeline dieselbe Windowing-Strategie, die Sie für die vorhandene Pipeline verwendet haben. Führen Sie die vorhandene Pipeline so lange aus, bis das Wasserzeichen den Zeitstempel des frühesten vollständigen Fensters überschreitet, das von der aktualisierten Pipeline verarbeitet wird. Leeren Sie die vorhandene Pipeline dann oder brechen Sie sie ab. Die aktualisierte Pipeline läuft an ihrer Stelle weiter und übernimmt die Bearbeitung effektiv selbstständig.
Im folgenden Diagramm wird dieser Vorgang veranschaulicht.
Im Diagramm ist Pipeline B der aktualisierte Job, der die Pipeline A übernimmt. Der Wert t ist der Zeitstempel des frühesten vollständigen Fensters, das von Pipeline B verarbeitet wird. Der Wert w ist das Wasserzeichen für Pipeline A. Der Einfachheit halber wird von einem perfekten Wasserzeichen ohne verspätete Daten ausgegangen. Die Verarbeitung und die Echtzeit werden auf der horizontalen Achse dargestellt. Beide Pipelines verwenden feste (rollierende) 5-Minuten-Fenster. Die Ergebnisse werden ausgelöst, nachdem das Wasserzeichen das Ende des Fensters passiert.
Da die gleichzeitige Ausgabe während des Zeitraums auftritt, in dem sich die beiden Pipelines überlappen, konfigurieren Sie die beiden Pipelines so, dass Ergebnisse an verschiedene Ziele geschrieben werden. Nachgelagerte Systeme können dann eine Abstraktion der beiden Zielsenken wie eine Datenbankansicht verwenden, um die kombinierten Ergebnisse abzufragen. Diese Systeme können die Abstraktion auch verwenden, um Ergebnisse aus dem sich überlappenden Zeitraum zu deduplizieren.
Im folgenden Beispiel wird der Ansatz zur Verwendung einer einfachen Pipeline beschrieben, die Eingabedaten aus Pub/Sub liest, einige Verarbeitungsschritte ausführt und die Ergebnisse in BigQuery schreibt.
Im Ausgangszustand wird die vorhandene Streamingpipeline (Pipeline A) ausgeführt und liest Nachrichten aus einem Pub/Sub-Thema (Thema) mit einem Abo (Abo A). Die Ergebnisse werden in eine BigQuery-Tabelle geschrieben (Tabelle A). Die Ergebnisse werden über eine BigQuery-Ansicht verarbeitet, die als Fassade dient, um die zugrundeliegenden Tabellenänderungen zu verdecken. Dieser Prozess ist eine Anwendung einer Gestaltungsmethode, die als Fassadenmuster bezeichnet wird. Das folgende Diagramm zeigt den Ausgangszustand.
Sie erstellen ein neues Abo (Abo B) für die aktualisierte Pipeline. Sie stellen die aktualisierte Pipeline (Pipeline B) bereit, die mit Abo B aus dem Pub/Sub-Thema (Thema) liest und in eine separate BigQuery-Tabelle (Tabelle B) schreibt. Das folgende Diagramm veranschaulicht diesen Ablauf.
An diesem Punkt werden Pipeline A und Pipeline B parallel ausgeführt und die Ergebnisse in separate Tabellen geschrieben. Sie erfassen die Zeit t als Zeitstempel des frühesten vollständigen Fensters, das von Pipeline B verarbeitet wird.
Wenn das Wasserzeichen von Pipeline A Zeitt überschreitet, beenden Sie Pipeline A per Drain“ Wenn Sie die Pipeline so beenden, werden alle geöffneten Fenster geschlossen und die Verarbeitung für In-Flight-Daten wird abgeschlossen. Wenn die Pipeline Fenster und vollständige Fenster enthält (vorausgesetzt, dass keine späten Daten vorhanden sind), lassen Sie vor dem Leeren von Pipeline A beide Pipelines laufen, bis Sie alle überlappenden Fenster haben. Sie beenden den Streamingjob für Pipeline A, nachdem alle In-Flight-Daten verarbeitet und in Tabelle A geschrieben wurden. Das folgende Diagramm zeigt diese Phase.
Zu diesem Zeitpunkt wird nur Pipeline B ausgeführt. Sie können auch eine BigQuery-Ansicht (Fassadenansicht) abfragen, die als Fassade für Tabelle A und Tabelle B fungiert. Konfigurieren Sie für Zeilen mit demselben Zeitstempel in beiden Tabellen die Ansicht so, dass die Zeilen zurückgegeben werden aus Tabelle B oder, wenn die Zeilen nicht vorhanden sind in Tabelle B , ein Fallback auf Tabelle A passiert“ Das folgende Diagramm zeigt die Ansicht (Fassadenansicht), die sowohl aus Tabelle A als auch aus Tabelle B liest.
An diesem Punkt können Sie Abo A löschen.
Wenn Probleme bei einer neuen Pipelinebereitstellung erkannt werden, können parallele Pipelines das Rollback vereinfachen. In diesem Beispiel möchten Sie möglicherweise, dass Pipeline A ausgeführt wird, während Sie Pipeline B auf den korrekten Betrieb hin beobachten. Wenn Probleme mit Pipeline B auftreten, können Sie ein Rollback auf Pipeline A vornehmen.
Beschränkungen
Dieser Ansatz unterliegt den folgenden Einschränkungen:
- Wenn Sie zwei Pipelines über dieselbe Eingabe ausführen, werden wahrscheinlich doppelte Daten an der Ausgabe generiert. Das nachgelagerte System muss die doppelten Daten erkennen und tolerieren.
- Beim Lesen aus einer Pub/Sub-Quelle wird die Verwendung desselben Abos für mehrere Pipelines nicht empfohlen und kann zu Richtigkeitsproblemen führen. In einigen Anwendungsfällen wie ETL-Pipelines (Extrahieren, Transformieren, Laden) kann die Verwendung desselben Abos in zwei Pipelines jedoch die Duplizierung reduzieren. Probleme mit dem Autoscaling sind in diesem Szenario wahrscheinlich, können jedoch mithilfe der Funktion zur Aktualisierung von laufenden Jobs behoben werden. Weitere Informationen finden Sie unter Autoscaling für Pub/Sub-Streamingpipelines optimieren.
- Beim Lesen aus einer Pub/Sub-Quelle werden durch die Verwendung eines zweiten Abos Duplikate generiert, jedoch keine Probleme mit der Datenrichtigkeit und dem Autoscaling.
Schemamutationen verarbeiten
Datenverarbeitungssysteme müssen im Laufe der Zeit häufig Schemamutationen berücksichtigen, manchmal aufgrund von geänderten Geschäftsanforderungen oder aus technischen Gründen. Die Anwendung von Schemaaktualisierungen erfordert in der Regel eine sorgfältige Planung und Ausführung, um Unterbrechungen der Geschäftsinformationssysteme zu vermeiden.
Stellen Sie sich eine einfache Pipeline vor, die Nachrichten mit JSON-Nutzlasten aus einem Pub/Sub-Thema liest. Die Pipeline konvertiert jede Nachricht in eine TableRow
-Instanz und schreibt die Zeilen dann in eine BigQuery-Tabelle. Das Schema der Ausgabetabelle ähnelt den Nachrichten, die von der Pipeline verarbeitet werden.
Im folgenden Diagramm wird das Schema als Schema A bezeichnet.
Im Laufe der Zeit kann sich das Nachrichtenschema auf anspruchsvolle Weise mutieren. Beispielsweise werden Felder hinzugefügt, entfernt oder ersetzt. Schema A entwickelt sich zu einem neuen Schema. In der folgenden Diskussion wird das neue Schema als Schema B bezeichnet. In diesem Fall muss Pipeline A aktualisiert werden und das Ausgabetabellenschema muss Schema B unterstützen.
Für die Ausgabetabelle können Sie einige Schemamutationen ohne Ausfallzeit ausführen.
Beispielsweise können Sie neue Felder hinzufügen oder Spaltenmodi lockern, indem Sie z. B. ohne AusfallzeitREQUIRED
in NULLABLE
ändern.
Diese Mutationen wirken sich in der Regel nicht auf vorhandene Abfragen aus. Schemamutationen, die vorhandene Schemafelder ändern oder entfernen, unterbrechen jedoch Abfragen oder führen zu anderen Störungen. Mit dem folgenden Ansatz werden Änderungen ohne Ausfallzeiten berücksichtigt.
Trennen Sie die Daten, die von der Pipeline in eine Hauptkontotabelle und in eine oder mehrere Staging-Tabellen geschrieben werden. Die Hauptkontotabelle speichert die von der Pipeline geschriebenen Verlaufsdaten. Staging-Tabellen speichern die neueste Pipelineausgabe. Sie können eine BigQuery-Fassadenansicht über die Hauptkonto- und Staging-Tabellen definieren, mit denen Nutzer sowohl Verlaufsdaten als auch aktuelle Daten abfragen können.
Das folgende Diagramm überarbeitet den vorherigen Pipeline-Ablauf und fügt eine Staging-Tabelle (Staging-Tabelle A), eine Hauptkontotabelle und eine Fassadenansicht hinzu.
Im überarbeiteten Ablauf verarbeitet Pipeline A Nachrichten, die Schema A verwenden, und schreibt die Ausgabe in die Staging-Tabelle A, die über ein kompatibles Schema verfügt. Die Hauptkontotabelle enthält Verlaufsdaten, die von früheren Versionen der Pipeline geschrieben wurden, sowie Ergebnisse, die regelmäßig aus der Staging-Tabelle zusammengeführt werden. Nutzer können aktuelle Daten, darunter Verlaufsdaten und Echtzeitdaten, über die Fassadenansicht abfragen.
Wenn das Nachrichtenschema von Schema A zu Schema B mutiert, können Sie den Pipelinecode so aktualisieren, dass er mit Nachrichten kompatibel ist, die Schema B verwenden. Die vorhandene Pipeline muss mit der neuen Implementierung aktualisiert werden. Durch parallele Pipelines können Sie dafür sorgen, dass die Verarbeitung von Streamingdaten ohne Unterbrechung fortgesetzt wird. Das Beenden und Ersetzen von Pipelines führt zu einer Unterbrechung der Verarbeitung, da über einen bestimmten Zeitraum keine Pipeline ausgeführt wird.
Die aktualisierte Pipeline schreibt in eine zusätzliche Staging-Tabelle (Staging-Tabelle B), die Schema B verwendet. Sie können einen orchestrierten Workflow verwenden, um die neue Staging-Tabelle zu erstellen, bevor Sie die Pipeline aktualisieren. Aktualisieren Sie auch die Fassadenansicht, um Ergebnisse aus der neuen Staging-Tabelle aufzunehmen. Dazu können Sie einen zugehörigen Workflowschritt verwenden.
Das folgende Diagramm stellt den aktualisierten Ablauf dar, der Staging-Tabelle B mit Schema B zeigt und wie die Fassadenansicht aktualisiert wurde, um Inhalte aus der Mastertabelle sowie aus beiden Staging-Tabellen aufzunehmen.
Als ein von der Pipelineaktualisierung separater Prozess können Sie die Staging-Tabellen entweder periodisch oder nach Bedarf in der Hauptkontotabelle zusammenführen. Das folgende Diagramm zeigt, wie die Staging-Tabelle A mit der Hauptkontotabelle zusammengeführt wird.