Vorhandene Pipeline aktualisieren

In diesem Dokument wird beschrieben, wie Sie einen laufenden Streamingjob aktualisieren. Sie können Ihren vorhandenen Dataflow-Job aus folgenden Gründen aktualisieren:

  • Sie möchten den Pipelinecode verbessern oder erweitern.
  • Sie möchten Programmfehler im Pipelinecode korrigieren.
  • Sie möchten die Pipeline aktualisieren, um Änderungen des Datenformats, Versionsänderungen oder andere Änderungen in der Datenquelle zu berücksichtigen.
  • Sie möchten eine Sicherheitslücke in Bezug auf Container-Optimized OS für alle Dataflow-Worker beheben.
  • Sie möchten eine Apache Beam-Streamingpipeline skalieren, um eine andere Anzahl von Workern zu verwenden.

Sie können Jobs auf zwei Arten aktualisieren:

  • Aktualisierung von In-Flight-Jobs: Bei Streamingjobs, die die Streaming Engine nutzen, können Sie die Joboptionen min-num-workers und max-num-workers aktualisieren, ohne den Job zu beenden oder die Job-ID zu ändern.
  • Ersatzjob: Um aktualisierten Pipelinecode auszuführen oder Joboptionen zu aktualisieren, die von Aktualisierungen von laufenden Jobs nicht unterstützt werden, starten Sie einen neuen Job, der den vorhandenen Job ersetzt. Prüfen Sie die relevante Jobgrafik, bevor Sie den neuen Job starten, um zu prüfen, ob ein Ersatzjob gültig ist.

Wenn Sie Ihren Job aktualisieren, führt der Dataflow-Dienst eine Prüfung der Kompatibilität zwischen dem derzeit ausgeführten und dem potenziellen Ersatzjob durch. Die Kompatibilitätsprüfung gewährleistet, dass beispielsweise Zwischenzustandsinformationen und gepufferte Daten vom vorigen Job auf den Ersatzjob übertragen werden können.

Sie können auch die integrierte Logging-Infrastruktur des Apache Beam SDK verwenden, um Informationen zu aktualisieren, wenn Sie Ihren Job aktualisieren. Weitere Informationen finden Sie unter Mit Pipelinelogs arbeiten. Verwenden Sie die Logging-Ebene DEBUG, um Probleme mit dem Pipelinecode zu identifizieren.

Aktualisierung der Option des laufenden Jobs

Für einen Streamingjob, der Streaming Engine verwendet, können Sie die folgenden Joboptionen aktualisieren, ohne den Job zu beenden oder die Job-ID zu ändern:

  • min-num-workers: Die Mindestanzahl von Compute Engine-Instanzen.
  • max-num-workers: Die Mindestanzahl von Compute Engine-Instanzen.
  • worker-utilization-hint: die CPU-Zielauslastung im Bereich [0,1, 0,9]

Für andere Jobaktualisierungen müssen Sie den aktuellen Job durch den aktualisierten Job ersetzen. Weitere Informationen finden Sie unter Ersatzjob starten.

Aktualisierung während der Übertragung durchführen

Führen Sie die folgenden Schritte aus, um eine Aktualisierung der Option eines laufenden Jobs durchzuführen.

gcloud

Führen Sie den Befehl gcloud dataflow jobs update-options aus:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

Ersetzen Sie Folgendes:

  • REGION: die ID der Region des Jobs
  • MINIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
  • MAXIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
  • TARGET_UTILIZATION: ein Wert im Bereich [0,1, 0,9]
  • JOB_ID: Die ID des zu aktualisierenden Jobs

Sie können --min-num-workers, --max-num-workers und worker-utilization-hint auch einzeln aktualisieren.

REST

Verwenden Sie die Methode projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS,
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

Ersetzen Sie Folgendes:

  • MASK: eine durch Kommas getrennte Liste von Parametern, die aktualisiert werden sollen, aus folgenden Elementen:
    • runtime_updatable_params.max_num_workers
    • runtime_updatable_params.min_num_workers
    • runtime_updatable_params.worker_utilization_hint
  • PROJECT_ID: Die Google Cloud-Projekt-ID des Dataflow-Jobs
  • REGION: die ID der Region des Jobs
  • JOB_ID: Die ID des zu aktualisierenden Jobs
  • MINIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
  • MAXIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
  • TARGET_UTILIZATION: ein Wert im Bereich [0,1, 0,9]

Sie können min_num_workers, max_num_workers und worker_utilization_hint auch einzeln aktualisieren. Geben Sie im Abfrageparameter updateMask an, welche Parameter aktualisiert werden sollen, und fügen Sie die aktualisierten Werte in das Feld runtimeUpdatableParams des Anfragetexts ein. Im folgenden Beispiel wird min_num_workers aktualisiert:

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Ein Job muss sich im Status "Wird ausgeführt" befinden, um für Aktualisierungen im laufenden Betrieb infrage zu kommen. Wenn der Job nicht gestartet wurde oder bereits abgebrochen wurde, tritt ein Fehler auf. Ebenso warten Sie beim Starten eines Ersatzjobs, bis er ausgeführt wird, bevor Sie In-Flight-Aktualisierungen an den neuen Job senden.

Nachdem Sie eine Aktualisierungsanfrage gesendet haben, warten Sie, bis die Anfrage abgeschlossen ist, bevor Sie eine weitere Aktualisierung senden. In den Joblogs sehen Sie, wann die Anfrage abgeschlossen ist.

Ersatzjob validieren

Prüfen Sie die relevante Jobgrafik, bevor Sie den neuen Job starten, um zu prüfen, ob ein Ersatzjob gültig ist. In Dataflow ist eine Jobgrafik eine grafische Darstellung einer Pipeline. Durch die Validierung der Jobgrafik verringern Sie das Risiko, dass in der Pipeline nach der Aktualisierung Fehler oder ein Pipelineversagen auftreten. Darüber hinaus können Sie Aktualisierungen validieren, ohne den ursprünglichen Job beenden zu müssen. So entsteht keine Ausfallzeit für den Job.

Führen Sie die Schritte zum Starten eines Ersatzjobs aus, um Ihre Jobgrafik zu validieren. Fügen Sie in den Aktualisierungsbefehl die Dataflow-Dienstoption graph_validate_only ein.

Java

  • Übergeben Sie die Option --update.
  • Setzen Sie die Option --jobName in PipelineOptions auf den Namen des zu aktualisierenden Jobs.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Fügen Sie die Dienstoption --dataflowServiceOptions=graph_validate_only ein.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transformNameMapping übergeben.
  • Wenn Sie einen Ersatzjob senden, der eine neuere Version des Apache Beam SDK verwendet, setzen Sie --updateCompatibilityVersion auf die Apache Beam SDK-Version, die im ursprünglichen Job verwendet wurde.

Python

  • Übergeben Sie die Option --update.
  • Setzen Sie die Option --job_name in PipelineOptions auf den Namen des zu aktualisierenden Jobs.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Fügen Sie die Dienstoption --dataflow_service_options=graph_validate_only ein.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform_name_mapping übergeben.
  • Wenn Sie einen Ersatzjob senden, der eine neuere Version des Apache Beam SDK verwendet, setzen Sie --updateCompatibilityVersion auf die Apache Beam SDK-Version, die im ursprünglichen Job verwendet wurde.

Go

  • Übergeben Sie die Option --update.
  • Legen Sie für die Option --job_name denselben Namen wie für den zu aktualisierenden Job fest.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Fügen Sie die Dienstoption --dataflow_service_options=graph_validate_only ein.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform_name_mapping übergeben.

gcloud

Verwenden Sie den Befehl gcloud dataflow flex-template run mit der Option additional-experiments, um die Jobgrafik für einen Job mit flexibler Vorlage zu validieren:

  • Übergeben Sie die Option --update.
  • Legen Sie für JOB_NAME denselben Namen wie für den Job fest, den Sie aktualisieren möchten.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Fügen Sie die Option --additional-experiments=graph_validate_only ein.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform-name-mappings übergeben.

Beispiel:

gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only

Ersetzen Sie JOB_NAME durch den Namen des Jobs, den Sie aktualisieren möchten.

REST

Verwenden Sie das Feld additionalExperiments im Objekt FlexTemplateRuntimeEnvironment (flexible Vorlagen) oder RuntimeEnvironment.

{
  additionalExperiments : ["graph_validate_only"]
  ...
}

Mit der Dienstoption graph_validate_only werden nur Pipelineaktualisierungen validiert. Verwenden Sie diese Option nicht beim Erstellen oder Starten von Pipelines. Starten Sie einen Ersatzjob ohne die Dienstoption graph_validate_only, um die Pipeline zu aktualisieren.

Wenn die Validierung des Jobdiagramms erfolgreich ist, zeigen Jobstatus und Joblogs die folgenden Statuswerte an:

  • Der Jobstatus lautet JOB_STATE_DONE.
  • In der Google Cloud Console lautet der Jobstatus Succeeded.
  • Die folgende Meldung wird in den Joblogs angezeigt:

    Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
    

Wenn die Validierung des Jobdiagramms fehlschlägt, zeigen Jobstatus und Joblogs die folgenden Statuswerte an:

  • Der Jobstatus lautet JOB_STATE_FAILED.
  • In der Google Cloud Console lautet der Jobstatus Failed.
  • In den Joblogs wird eine Meldung angezeigt, die den Inkompatibilitätsfehler beschreibt. Der Inhalt der Nachricht hängt vom Fehler ab.

Ersatzjob starten

Sie können einen vorhandenen Job aus folgenden Gründen ersetzen:

  • So führen Sie aktualisierten Pipelinecode aus.
  • So aktualisieren Sie Joboptionen, die keine In-Flight-Updates unterstützen

Prüfen Sie die relevante Jobgrafik, bevor Sie den neuen Job starten, um zu prüfen, ob ein Ersatzjob gültig ist.

Legen Sie beim Starten eines Ersatzjobs die folgenden Pipelineoptionen fest, um den Aktualisierungsvorgang zusätzlich zu den normalen Optionen des Jobs auszuführen:

Java

  • Übergeben Sie die Option --update.
  • Setzen Sie die Option --jobName in PipelineOptions auf den Namen des zu aktualisierenden Jobs.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transformNameMapping übergeben.
  • Wenn Sie einen Ersatzjob senden, der eine neuere Version des Apache Beam SDK verwendet, setzen Sie --updateCompatibilityVersion auf die Apache Beam SDK-Version, die im ursprünglichen Job verwendet wurde.

Python

  • Übergeben Sie die Option --update.
  • Setzen Sie die Option --job_name in PipelineOptions auf den Namen des zu aktualisierenden Jobs.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform_name_mapping übergeben.
  • Wenn Sie einen Ersatzjob senden, der eine neuere Version des Apache Beam SDK verwendet, setzen Sie --updateCompatibilityVersion auf die Apache Beam SDK-Version, die im ursprünglichen Job verwendet wurde.

Go

  • Übergeben Sie die Option --update.
  • Legen Sie für die Option --job_name denselben Namen wie für den zu aktualisierenden Job fest.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform_name_mapping übergeben.

gcloud

Verwenden Sie den Befehl gcloud dataflow flex-template run, um einen flexiblen Vorlagenjob mit der gcloud CLI zu aktualisieren. Das Aktualisieren anderer Jobs mit der gcloud CLI wird nicht unterstützt.

  • Übergeben Sie die Option --update.
  • Legen Sie für JOB_NAME denselben Namen wie für den Job fest, den Sie aktualisieren möchten.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform-name-mappings übergeben.

REST

In dieser Anleitung wird gezeigt, wie Sie Jobs ohne Vorlagen mit der REST API aktualisieren. Informationen zum Aktualisieren eines Jobs mit klassischer Vorlage über die REST API finden Sie unter Streamingjob aus benutzerdefinierter Vorlage aktualisieren. Informationen zum Aktualisieren eines Jobs mit flexibler Vorlage mit der REST API finden Sie unter Job mit flexibler Vorlage aktualisieren.

  1. Rufen Sie die Ressource job für den Job, den Sie ersetzen möchten, mit der Methode projects.locations.jobs.get ab. Verwenden Sie den Abfrageparameter view mit dem Wert JOB_VIEW_DESCRIPTION. Wenn Sie JOB_VIEW_DESCRIPTION verwenden, wird die Datenmenge in der Antwort begrenzt, sodass Ihre nachfolgende Anfrage die Größenbeschränkungen nicht überschreitet. Für detailliertere Jobinformationen verwenden Sie den Wert JOB_VIEW_ALL.

    GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
    

    Ersetzen Sie die folgenden Werte:

    • PROJECT_ID: Die Google Cloud-Projekt-ID des Dataflow-Jobs
    • REGION: Die Region des Jobs, den Sie aktualisieren möchten
    • JOB_ID: Die Job-ID des Jobs, den Sie aktualisieren möchten
  2. Verwenden Sie zum Aktualisieren des Feldes die Methode projects.locations.jobs.create. Verwenden Sie im Anfragetext die abgerufene job-Ressource.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    {
      "id": JOB_ID,
      "replaceJobId": JOB_ID,
      "name": JOB_NAME,
      "type": "JOB_TYPE_STREAMING",
      "transformNameMapping": {
        string: string,
        ...
      },
    }
    

    Ersetzen Sie Folgendes:

    • JOB_ID: Die Job-ID des Jobs, den Sie aktualisieren möchten.
    • JOB_NAME: Der Jobname des Jobs, den Sie aktualisieren möchten.

    Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit dem Feld transformNameMapping übergeben.

  3. Optional: Zum Senden der Anfrage mit curl (Linux, macOS oder Cloud Shell) speichern Sie die Anfrage in einer JSON-Datei und führen Sie dann den folgenden Befehl aus:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    

    Ersetzen Sie FILE_PATH durch den Pfad zur JSON-Datei, die den Anfragetext enthält.

Namen des Ersatzjobs angeben

Java

Wenn Sie den Ersatzjob starten, muss der Wert, den Sie für die Option --jobName übergeben, exakt mit dem Namen des zu ersetzenden Jobs übereinstimmen.

Python

Wenn Sie den Ersatzjob starten, muss der Wert, den Sie für die Option --job_name übergeben, exakt mit dem Namen des zu ersetzenden Jobs übereinstimmen.

Go

Wenn Sie den Ersatzjob starten, muss der Wert, den Sie für die Option --job_name übergeben, exakt mit dem Namen des zu ersetzenden Jobs übereinstimmen.

gcloud

Wenn Sie den Ersatzjob starten, muss JOB_NAME genau mit dem Namen des zu ersetzenden Jobs übereinstimmen.

REST

Legen Sie den Wert des Felds replaceJobId auf die Job-ID des zu aktualisierenden Jobs fest. Wählen Sie den vorigen Job in der Dataflow-Monitoring-Oberfläche aus, um den richtigen Jobnamen zu ermitteln. Suchen Sie dann in der Seitenleiste Jobdetails das Feld Job-ID.

Wählen Sie den vorigen Job in der Dataflow-Monitoring-Oberfläche aus, um den richtigen Jobnamen zu ermitteln. Suchen Sie dann in der Seitenleiste Jobinfo das Feld Jobname:

Die Seitenleiste „Jobdetails“ für einen laufenden Dataflow-Job.
Abbildung 1: Die Seitenleiste „Jobdetails“ für einen laufenden Dataflow-Job mit dem Feld „Jobname“.

Alternativ können Sie die Liste der vorhandenen Jobs über die Dataflow-Befehlszeilenschnittstelle abfragen. Geben Sie den Befehl gcloud dataflow jobs list in Ihr Shell- oder Terminal-Fenster ein, um eine Liste der Dataflow-Jobs in Ihrem Google Cloud-Projekt zu erhalten, und suchen Sie nach dem NAME für den Job, den Sie ersetzen möchten:

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

Transformationszuordnung erstellen

Falls die Namen der Transformationen in der Ersatz-Pipeline von jenen in der vorigen Pipeline abweichen, benötigt der Dataflow-Dienst eine Transformationszuordnung. Die Transformationszuordnung dient dazu, die benannten Transformationen des vorigen Pipelinecodes den Namen im neuen Pipelinecode zuzuordnen.

Java

Übergeben Sie die Zuordnung mit der Befehlszeilenoption --transformNameMapping im folgenden Format:

--transformNameMapping= . 
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Beachten Sie, dass Sie in --transformNameMapping nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Hinweis: Bei der Ausführung mit --transformNameMapping müssen Anführungszeichen gemäß den Anforderungen Ihrer Shell gegebenenfalls mit Escapezeichen versehen werden. Zum Beispiel in Bash:

--transformNameMapping='{"oldTransform1":"newTransform1",...}'

Python

Übergeben Sie die Zuordnung mit der Befehlszeilenoption --transform_name_mapping im folgenden Format:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Beachten Sie, dass Sie in --transform_name_mapping nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Hinweis: Bei der Ausführung mit --transform_name_mapping müssen Anführungszeichen gemäß den Anforderungen Ihrer Shell gegebenenfalls mit Escapezeichen versehen werden. Zum Beispiel in Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

Go

Übergeben Sie die Zuordnung mit der Befehlszeilenoption --transform_name_mapping im folgenden Format:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Beachten Sie, dass Sie in --transform_name_mapping nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Hinweis: Bei der Ausführung mit --transform_name_mapping müssen Anführungszeichen gemäß den Anforderungen Ihrer Shell gegebenenfalls mit Escapezeichen versehen werden. Zum Beispiel in Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

gcloud

Übergeben Sie die Zuordnung mit der Option --transform-name-mappings im folgenden Format:

--transform-name-mappings= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Beachten Sie, dass Sie in --transform-name-mappings nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Hinweis: Bei der Ausführung mit --transform-name-mappings müssen Anführungszeichen gemäß den Anforderungen Ihrer Shell gegebenenfalls mit Escapezeichen versehen werden. Zum Beispiel in Bash:

--transform-name-mappings='{"oldTransform1":"newTransform1",...}'

REST

Übergeben Sie die Zuordnung mit dem Feld transformNameMapping im folgenden Format:

"transformNameMapping": {
  oldTransform1: newTransform1,
  oldTransform2: newTransform2,
  ...
}

Beachten Sie, dass Sie in transformNameMapping nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Transformationsnamen bestimmen

Der Transformationsname in einer Instanz in der Zuordnung ist der Name, den Sie bei der Anwendung der Transformation in Ihrer Pipeline angegeben haben. Beispiel:

Java

  .apply("FormatResults", ParDo
    .of(new DoFn<KV<String, Long>>, String>() {
      ...
     }
  }))

Python

  | 'FormatResults' >> beam.ParDo(MyDoFn())

Go

  // In Go, this is always the package-qualified name of the DoFn itself.
  // For example, if the FormatResults DoFn is in the main package, its name
  // is "main.FormatResults".
  beam.ParDo(s, FormatResults, results)

Außerdem können Sie die Transformationsnamen des vorigen Jobs durch Untersuchung der Ausführungsgrafik des Jobs in der Dataflow-Überwachungsoberfläche abrufen:

Ausführungsgrafik für eine WordCount-Pipeline
Abbildung 2: Die Ausführungsgrafik für eine WordCount-Pipeline, wie sie in der Dataflow Monitoring-Oberfläche angezeigt wird.

Benennung von zusammengesetzten Transformationen

Transformationsnamen sind in Abhängigkeit von der Hierarchie in der Pipeline hierarchisch aufgebaut. Wenn eine Pipeline über eine zusammengesetzte Transformation verfügt, werden die eingebetteten Transformationen mit Bezug auf die sie enthaltende Transformation benannt. Nehmen wir beispielsweise an, Ihre Pipeline enthält eine zusammengesetzte Transformation namens CountWidgets, die wiederum eine Transformation namens Parse enthält. Der vollständige Name der Transformation lautet CountWidgets/Parse. Dies ist der Name, den Sie in der Transformationszuordnung angeben müssen.

Wenn die neue Pipeline eine zusammengesetzte Transformation einem anderen Namen zuordnet, werden auch alle darin verschachtelten Transformationen automatisch umbenannt. Sie müssen die neuen Namen für die inneren Transformationen in der Transformationszuordnung angeben.

Transformationshierarchie refaktorieren

Wenn die Hierarchie der Ersatzpipeline von jener der vorigen Pipeline abweicht, müssen Sie die Zuordnung ausdrücklich deklarieren. Möglicherweise sehen Sie eine andere Transformationshierarchie, weil Sie die zusammengesetzten Transformationen refaktoriert haben oder die Pipeline von einer zusammengesetzten Transformation aus einer Mediathek abhängt, die sich geändert hat.

Beispiel: In der vorigen Pipeline wurde die zusammengesetzte Transformation CountWidgets angewendet, die eine innere Transformation namens Parse enthielt. Die Ersatzpipeline refaktoriert CountWidgets und verschachtelt Parse in eine andere Transformation namens Scan. Damit die Aktualisierung gelingen kann, müssen Sie den vollständigen Transformationsnamen in der vorigen Pipeline (CountWidgets/Parse) ausdrücklich dem Transformationsnamen in der neuen Pipeline (CountWidgets/Scan/Parse) zuordnen:

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

--transform_name_mapping={"CountWidgets/Parse":""}

Go

--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

--transform_name_mapping={"CountWidgets/main.Parse":""}

gcloud

--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

--transform-name-mappings={"CountWidgets/main.Parse":""}

REST

"transformNameMapping": {
  CountWidgets/Parse: CountWidgets/Scan/Parse
}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

"transformNameMapping": {
  CountWidgets/main.Parse: null
}

Auswirkungen des Ersetzens eines Jobs

Wenn Sie einen vorhandenen Job ersetzen, führt ein neuer Job den aktualisierten Pipelinecode aus. Der Dataflow-Dienst behält den Jobnamen bei, führt den Ersatzjob aber mit einer aktualisierten Job-ID aus. Dieser Vorgang kann zu Ausfallzeiten führen, während der vorhandene Job angehalten wird, die Kompatibilitätsprüfung ausgeführt wird und der neue Job gestartet wird.

Der Ersatzjob behält folgende Elemente bei:

Zwischenzustandsdaten

Zwischenzustandsdaten aus dem vorherigen Job bleiben erhalten. Zustandsdaten enthalten keine In-Memory-Caches. Wenn Sie bei der Aktualisierung Ihrer Pipeline die Cache-Daten beibehalten möchten, refaktorieren Sie Ihre Pipeline so, dass Caches in Statusdaten oder Nebeneingaben umgewandelt werden. Weitere Informationen zur Verwendung von Nebeneingaben finden Sie in der Apache Beam-Dokumentation unter Nebeneingabemuster.

Streamingpipelines haben Größenbeschränkungen für ValueState und für Nebeneingaben. Wenn Sie große Caches haben, die Sie beibehalten möchten, müssen Sie daher möglicherweise einen externen Speicher wie Memorystore oder Bigtable verwenden.

In-Flight-Daten

In-Flight-Daten werden durch die Transformationen in der neuen Pipeline verarbeitet. Ob zusätzliche Transformationen, die Sie dem Ersatz-Pipelinecode hinzufügen, wirksam werden, hängt jedoch davon ab, wo die Datensätze gepuffert werden. In diesem Beispiel verfügt Ihre vorhandene Pipeline über die folgenden Transformationen:

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Go

   beam.ParDo(s, ReadStrings)
   beam.ParDo(s, FormatStrings)

Sie können Ihren Job folgendermaßen durch neuen Pipelinecode ersetzen:

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Go

  beam.ParDo(s, ReadStrings)
  beam.ParDo(s, RemoveStringsStartingWithA)
  beam.ParDo(s, FormatStrings)

Selbst wenn Sie eine Transformation zum Herausfiltern von Strings, die mit dem Buchstaben "A" beginnen, hinzugefügen, sieht die nächste Transformation (FormatStrings) möglicherweise trotzdem gepufferte oder in Übertragung befindliche (In-Flight-)Strings des vorigen Jobs, die mit "A" beginnen.

Windowing ändern

Sie können die Windowing- und Trigger-Strategien für die PCollection-Objekte in der Ersatzpipeline ändern. Dabei ist jedoch Vorsicht geboten. Eine Änderung der Windowing- oder Trigger-Strategien wirkt sich nicht auf bereits gepufferte oder anderweitig in Übertragung befindliche Daten aus.

Es empfiehlt sich, nur geringfügige Änderungen am Windowing der Pipeline vorzunehmen, z. B. eine Änderung der Dauer von festen oder fließenden Zeitfenstern. Durch größere Änderungen an Windowing- oder Trigger-Strategien, z. B. eine Änderung des Windowing-Algorithmus, kann die Pipelineausgabe unvorhersehbare Ergebnisse liefern.

Kompatibilitätsprüfung von Jobs

Wenn Sie Ihren Ersatzjob starten, führt der Dataflow-Dienst eine Prüfung der Kompatibilität zwischen dem Ersatzjob und dem vorigen Job durch. Bei Bestehen der Kompatibilitätsprüfung wird der vorige Job angehalten. Der Ersatzjob startet dann unter demselben Jobnamen im Dataflow-Dienst. Bei Nichtbestehen der Kompatibilitätsprüfung wird der vorige Job weiterhin im Dataflow-Dienst ausgeführt und der Ersatzjob gibt einen Fehler zurück.

Java

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  1. Verwenden Sie pipeline.run().waitUntilFinish() in Ihrem Pipelinecode.
  2. Führen Sie Ihr Ersatz-Pipelineprogramm mit der Option --update aus.
  3. Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich bestanden hat.
  4. Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

Alternativ können Sie den Status Ihres Ersatzjobs in der Monitoring-Oberfläche von Dataflow überwachen. Wenn Ihr Job erfolgreich gestartet wurde, hat er auch die Kompatibilitätsprüfung bestanden.

Python

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  1. Verwenden Sie pipeline.run().wait_until_finish() in Ihrem Pipelinecode.
  2. Führen Sie Ihr Ersatz-Pipelineprogramm mit der Option --update aus.
  3. Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich bestanden hat.
  4. Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

Alternativ können Sie den Status Ihres Ersatzjobs in der Monitoring-Oberfläche von Dataflow überwachen. Wenn Ihr Job erfolgreich gestartet wurde, hat er auch die Kompatibilitätsprüfung bestanden.

Go

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Insbesondere müssen Sie die nicht blockierende Ausführung mit dem Flag --execute_async oder --async einstellen. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  1. Führen Sie Ihr Ersatz-Pipelineprogramm mit der Option --update und ohne das Flag --execute_async oder --async aus.
  2. Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich besteht.
  3. Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

gcloud

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  1. Verwenden Sie bei Java-Pipelines pipeline.run().waitUntilFinish() in Ihrem Pipelinecode. Verwenden Sie bei Python-Pipelines pipeline.run().wait_until_finish() in Ihrem Pipelinecode. Führen Sie bei Go-Pipelines die Schritte auf dem Tab „Go“ aus.
  2. Führen Sie Ihr Ersatz-Pipelineprogramm mit der Option --update aus.
  3. Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich bestanden hat.
  4. Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

REST

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  • Verwenden Sie bei Java-Pipelines pipeline.run().waitUntilFinish() in Ihrem Pipelinecode. Verwenden Sie bei Python-Pipelines pipeline.run().wait_until_finish() in Ihrem Pipelinecode. Führen Sie bei Go-Pipelines die Schritte auf dem Tab „Go“ aus.
  • Führen Sie Ihr Ersatz-Pipelineprogramm mit dem Feld replaceJobId aus.
  • Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich besteht.
  • Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

Bei der Kompatibilitätsprüfung wird die bereitgestellte Transformationszuordnung verwendet, um sicherzustellen, dass Dataflow Daten in einem Zwischenzustand aus den Schritten des vorigen Jobs an den Ersatzjob übertragen kann. Außerdem sorgt die Kompatibilitätsprüfung dafür, dass die PCollection-Objekte in Ihrer Pipeline dieselben Coder verwenden. Wenn Sie einen Coder ändern, kann die Kompatibilitätsprüfung fehlschlagen, da etwaige In-Flight-Daten oder gepufferte Datensätze in der Ersatzpipeline möglicherweise nicht richtig serialisiert werden.

Kompatibilitätsbrüche verhindern

Bestimmte Unterschiede zwischen der vorigen und der Ersatzpipeline können zum Nichtbestehen der Kompatibilitätsprüfung führen. Zu diesen Unterschieden gehören:

  • Änderung des Pipelinediagramms ohne Angabe einer Zuordnung: Wenn Sie einen Job aktualisieren, wird von Dataflow versucht, die Transformationen aus dem vorigen und dem Ersatzjob einander zuzuordnen. Durch diesen Abgleichsprozess kann Dataflow Zwischenzustandsdaten für jeden Schritt übertragen. Wenn Sie Schritte umbenennen oder entfernen, müssen Sie eine Transformationszuordnung bereitstellen, damit Dataflow Zustandsdaten entsprechend zuordnen kann.
  • Änderung der Nebeneingaben für einen Schritt: Wenn Sie einer Transformation in der Ersatzpipeline Nebeneingaben hinzufügen oder sie aus ihr entfernen, schlägt die Kompatibilitätsprüfung fehl.
  • Änderung des Codes für einen Schritt: Wenn Sie einen Job aktualisieren, werden alle derzeit zwischengespeicherten Datensätze von Dataflow beibehalten und im Ersatzjob verarbeitet. Gepufferte Daten können beispielsweise während der Windowing-Auflösung auftreten. Wenn der Ersatzjob eine andere oder eine inkompatible Datencodierung verwendet, können diese Datensätze von Dataflow nicht serialisiert oder deserialisiert werden.
  • "Zustandsorientierten" Vorgang aus der Pipeline entfernen. Wenn Sie zustandsorientierte Vorgänge aus der Pipeline entfernen, besteht der Ersatzjob möglicherweise die Kompatibilitätsprüfung nicht. Dataflow kann im Sinne der Effizienz mehrere Schritte zusammenfassen. Wenn eine zustandsabhängige Operation aus einem zusammengefassten Schritt fehlt, schlägt die Prüfung fehl. Bei "stateful"-Operationen handelt es sich um:

    • Transformationen, die Nebeneingaben erzeugen oder verbrauchen.
    • E/A-Lesevorgänge
    • Transformationen mit verschlüsselten Zuständen
    • Transformationen mit Fensterzusammenführung
  • Zustandsorientierte DoFn-Variablen ändern. Wenn bei Streaming-Jobs Ihre Pipeline zustandsorientierte DoFns enthält, kann das Ändern der zustandsorientierten DoFn-Variablen dazu führen, dass die Pipeline fehlschlägt.

  • Es wird versucht, den Ersatzjob in einer anderen geografischen Zone auszuführen: Führen Sie den Ersatzjob in derselben Zone aus, in der Sie den vorigen Job ausgeführt haben.

Schemas aktualisieren

Apache Beam lässt PCollection zu Schemas mit benannten Feldern zu. In diesem Fall sind keine expliziten Coder erforderlich. Wenn die Feldnamen und -typen für ein bestimmtes Schema unverändert bleiben (einschließlich verschachtelter Felder), führt dieses Schema nicht dazu, dass die Aktualisierungsprüfung fehlschlägt. Die Aktualisierung kann jedoch weiterhin blockiert sein, wenn andere Segmente der neuen Pipeline nicht kompatibel sind.

Schemas entwickeln

Häufig ist es erforderlich, das Schema einer PCollection aufgrund von sich entwickelnden Geschäftsanforderungen zu entwickeln. Der Dataflow-Dienst ermöglicht beim Aktualisieren der Pipeline die folgenden Änderungen an einem Schema:

  • Ein oder mehrere neue Felder zu einem Schema hinzufügen, einschließlich verschachtelter Felder.
  • Obligatorischer Feldtyp (keine Nullwerte zulässig), der optional sein kann (Nullwerte zulässig).

Das Entfernen von Feldern, Ändern von Feldnamen oder Ändern von Feldtypen ist während der Aktualisierung nicht zulässig.

Zusätzliche Daten an einen vorhandenen ParDo-Vorgang übergeben

Sie können je nach Anwendungsfall zusätzliche (Out-of-Band-)Daten an einen vorhandenen ParDo-Vorgang übergeben. Verwenden Sie dazu je nach Anwendungsfall eine der folgenden Methoden:

  • Informationen als Felder in Ihrer DoFn-Unterklasse serialisieren.
  • Alle Variablen, auf die in den Methoden in einem anonymen DoFn verwiesen wird, werden automatisch serialisiert.
  • Daten innerhalb von DoFn.startBundle() berechnen.
  • Daten mit ParDo.withSideInputs übergeben.

Weitere Informationen finden Sie auf den folgenden Seiten: