Vorhandene Pipeline aktualisieren

In diesem Dokument wird beschrieben, wie Sie einen laufenden Streamingjob aktualisieren. Sie können Ihren vorhandenen Dataflow-Job aus folgenden Gründen aktualisieren:

  • Sie möchten den Pipelinecode verbessern oder erweitern.
  • Sie möchten Programmfehler im Pipelinecode korrigieren.
  • Sie möchten die Pipeline aktualisieren, um Änderungen des Datenformats, Versionsänderungen oder andere Änderungen in der Datenquelle zu berücksichtigen.
  • Sie möchten eine Sicherheitslücke in Bezug auf Container-Optimized OS für alle Dataflow-Worker beheben.
  • Sie möchten eine Apache Beam-Streamingpipeline skalieren, um eine andere Anzahl von Workern zu verwenden.

Es gibt zwei Möglichkeiten, Jobs zu aktualisieren:

  • Aktualisierung von In-Flight-Jobs: Bei Streamingjobs, die die Streaming Engine nutzen, können Sie die Joboptionen min-num-workers und max-num-workers aktualisieren, ohne den Job zu beenden oder die Job-ID zu ändern.
  • Ersatzjob: Starten Sie einen neuen Job, der den vorhandenen Job ersetzt, um den aktualisierten Pipelinecode auszuführen oder um Joboptionen zu aktualisieren, die von Aktualisierungen laufender Jobs nicht unterstützt werden. Vor dem Starten des neuen Jobs müssen Sie die Jobgrafik validieren, um zu prüfen, ob ein Ersatzjob gültig ist.

Wenn Sie Ihren Job aktualisieren, führt der Dataflow-Dienst eine Prüfung der Kompatibilität zwischen dem derzeit ausgeführten und dem potenziellen Ersatzjob durch. Die Kompatibilitätsprüfung gewährleistet, dass beispielsweise Zwischenzustandsinformationen und gepufferte Daten vom vorigen Job auf den Ersatzjob übertragen werden können.

Sie können auch die integrierte Logging-Infrastruktur des Apache Beam SDK verwenden, um Informationen beim Aktualisieren Ihres Jobs zu protokollieren. Weitere Informationen finden Sie unter Mit Pipelinelogs arbeiten. Verwenden Sie die Logging-Ebene DEBUG, um Probleme mit dem Pipelinecode zu identifizieren.

Aktualisierung der Option für In-Flight-Jobs

Bei einem Streamingjob, der Streaming Engine verwendet, können Sie die folgenden Joboptionen aktualisieren, ohne den Job zu beenden oder die Job-ID zu ändern:

  • min-num-workers: Die Mindestanzahl von Compute Engine-Instanzen.
  • max-num-workers: Die maximale Anzahl von Compute Engine-Instanzen.

Bei anderen Jobaktualisierungen müssen Sie den aktuellen Job durch den aktualisierten Job ersetzen. Weitere Informationen finden Sie unter Ersatzjob starten.

Aktualisierung während der Übertragung durchführen

Führen Sie die folgenden Schritte aus, um eine laufende Joboption zu aktualisieren.

gcloud

Führen Sie den Befehl gcloud dataflow jobs update-options aus:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

Ersetzen Sie Folgendes:

  • REGION: die ID der Region des Jobs
  • JOB_ID: die ID des zu aktualisierenden Jobs

Sie können --min-num-workers und --max-num-workers auch einzeln aktualisieren.

REST

Verwenden Sie die Methode projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: Die Google Cloud-Projekt-ID des Dataflow-Jobs
  • REGION: die ID der Region des Jobs
  • JOB_ID: die ID des zu aktualisierenden Jobs
  • MINIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.
  • MAXIMUM_WORKERS: Die Mindestanzahl von Compute Engine-Instanzen.

Sie können min_num_workers und max_num_workers auch einzeln aktualisieren. Geben Sie im updateMask-Abfrageparameter an, welche Parameter aktualisiert werden sollen, und fügen Sie die aktualisierten Werte in das Feld runtimeUpdatableParams des Anfragetexts ein. Im folgenden Beispiel wird min_num_workers aktualisiert:

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Ein Job muss sich im Status „Wird ausgeführt“ befinden, damit In-Flight-Updates unterstützt werden. Wenn der Job nicht gestartet wurde oder abgebrochen wurde, tritt ein Fehler auf. Wenn Sie einen Ersatzjob starten, warten Sie, bis er ausgeführt wird, bevor Sie laufende Aktualisierungen an den neuen Job senden.

Nachdem Sie eine Aktualisierungsanfrage gesendet haben, warten Sie, bis die Anfrage abgeschlossen ist, bevor Sie eine weitere Aktualisierung senden. In den Joblogs sehen Sie, wann die Anfrage abgeschlossen ist.

Ersatzjob validieren

Prüfen Sie die relevante Jobgrafik, bevor Sie den neuen Job starten, um zu prüfen, ob ein Ersatzjob gültig ist. In Dataflow ist eine Jobgrafik eine grafische Darstellung einer Pipeline. Durch die Validierung der Jobgrafik verringern Sie das Risiko, dass in der Pipeline nach der Aktualisierung Fehler oder ein Pipelineversagen auftreten. Außerdem können Sie Aktualisierungen validieren, ohne den ursprünglichen Job beenden zu müssen, sodass keine Ausfallzeiten für den Job auftreten.

Führen Sie die Schritte zum Starten eines Ersatzjobs aus, um die Jobgrafik zu validieren. Fügen Sie die Dataflow-Dienstoption graph_validate_only in den Aktualisierungsbefehl ein.

Java

  • Übergeben Sie die Option --update.
  • Setzen Sie die Option --jobName in PipelineOptions auf den Namen des zu aktualisierenden Jobs.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Fügen Sie die Dienstoption --dataflowServiceOptions=graph_validate_only ein.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transformNameMapping übergeben.
  • Wenn Sie einen Ersatzjob senden, der eine neuere Version des Apache Beam SDK verwendet, setzen Sie --updateCompatibilityVersion auf die Apache Beam SDK-Version, die im ursprünglichen Job verwendet wurde.

Python

  • Übergeben Sie die Option --update.
  • Setzen Sie die Option --job_name in PipelineOptions auf den Namen des zu aktualisierenden Jobs.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Fügen Sie die Dienstoption --dataflow_service_options=graph_validate_only ein.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform_name_mapping übergeben.
  • Wenn Sie einen Ersatzjob senden, der eine neuere Version des Apache Beam SDK verwendet, setzen Sie --updateCompatibilityVersion auf die Apache Beam SDK-Version, die im ursprünglichen Job verwendet wurde.

Einfach loslegen (Go)

  • Übergeben Sie die Option --update.
  • Legen Sie für die Option --job_name denselben Namen wie für den zu aktualisierenden Job fest.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Fügen Sie die Dienstoption --dataflow_service_options=graph_validate_only ein.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform_name_mapping übergeben.

gcloud

Verwenden Sie den Befehl gcloud dataflow flex-template run mit der Option additional-experiments, um die Jobgrafik für einen Job mit flexibler Vorlage zu validieren:

  • Übergeben Sie die Option --update.
  • Legen Sie für JOB_NAME denselben Namen wie für den Job fest, den Sie aktualisieren möchten.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Fügen Sie die Option --additional-experiments=graph_validate_only ein.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform-name-mappings übergeben.

Beispiel:

gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only

Ersetzen Sie JOB_NAME durch den Namen des Jobs, den Sie aktualisieren möchten.

REST

Verwenden Sie das Feld additionalExperiments im Objekt FlexTemplateRuntimeEnvironment (flexible Vorlagen) oder RuntimeEnvironment.

{
  additionalExperiments : ["graph_validate_only"]
  ...
}

Die Dienstoption graph_validate_only validiert nur Pipelineaktualisierungen. Verwenden Sie diese Option nicht, wenn Sie Pipelines erstellen oder starten. Zum Aktualisieren einer Pipeline starten Sie einen Ersatzjob ohne die Dienstoption graph_validate_only.

Wenn die Validierung des Jobdiagramms erfolgreich ist, zeigen Jobstatus und Joblogs die folgenden Statuswerte an:

  • Der Jobstatus ist JOB_STATE_DONE.
  • In der Google Cloud Console lautet der Jobstatus Succeeded.
  • In den Joblogs wird die folgende Meldung angezeigt:

    Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
    

Wenn die Validierung des Jobdiagramms fehlschlägt, zeigen Jobstatus und Joblogs die folgenden Statuswerte an:

  • Der Jobstatus ist JOB_STATE_FAILED.
  • In der Google Cloud Console lautet der Jobstatus Failed.
  • In den Joblogs wird eine Meldung angezeigt, die den Inkompatibilitätsfehler beschreibt. Der Nachrichteninhalt ist vom Fehler abhängig.

Ersatzjob starten

Sie können einen vorhandenen Job aus folgenden Gründen ersetzen:

  • Aktualisierten Pipelinecode ausführen.
  • So aktualisieren Sie Joboptionen, die In-Flight-Updates nicht unterstützen:

Prüfen Sie die relevante Jobgrafik, bevor Sie den neuen Job starten, um zu prüfen, ob ein Ersatzjob gültig ist.

Legen Sie beim Starten eines Ersatzjobs die folgenden Pipelineoptionen fest, um den Aktualisierungsvorgang zusätzlich zu den regulären Optionen des Jobs auszuführen:

Java

  • Übergeben Sie die Option --update.
  • Setzen Sie die Option --jobName in PipelineOptions auf den Namen des zu aktualisierenden Jobs.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transformNameMapping übergeben.
  • Wenn Sie einen Ersatzjob senden, der eine neuere Version des Apache Beam SDK verwendet, setzen Sie --updateCompatibilityVersion auf die Apache Beam SDK-Version, die im ursprünglichen Job verwendet wurde.

Python

  • Übergeben Sie die Option --update.
  • Setzen Sie die Option --job_name in PipelineOptions auf den Namen des zu aktualisierenden Jobs.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform_name_mapping übergeben.
  • Wenn Sie einen Ersatzjob senden, der eine neuere Version des Apache Beam SDK verwendet, setzen Sie --updateCompatibilityVersion auf die Apache Beam SDK-Version, die im ursprünglichen Job verwendet wurde.

Einfach loslegen (Go)

  • Übergeben Sie die Option --update.
  • Legen Sie für die Option --job_name denselben Namen wie für den zu aktualisierenden Job fest.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform_name_mapping übergeben.

gcloud

Verwenden Sie den Befehl gcloud dataflow flex-template run, um einen flexiblen Vorlagenjob mit der gcloud CLI zu aktualisieren. Die Aktualisierung anderer Jobs über die gcloud CLI wird nicht unterstützt.

  • Übergeben Sie die Option --update.
  • Legen Sie für JOB_NAME denselben Namen wie für den Job fest, den Sie aktualisieren möchten.
  • Setzen Sie die Option --region auf die Region des Jobs, den Sie aktualisieren möchten.
  • Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit der Option --transform-name-mappings übergeben.

REST

In dieser Anleitung wird gezeigt, wie Sie Jobs ohne Vorlagen mit der REST API aktualisieren. Informationen zum Aktualisieren eines Jobs mit klassischer Vorlage über die REST API finden Sie unter Streamingjob aus benutzerdefinierter Vorlage aktualisieren. Informationen zum Aktualisieren eines Jobs mit flexibler Vorlage mit der REST API finden Sie unter Job mit flexibler Vorlage aktualisieren.

  1. Rufen Sie die Ressource job für den Job ab, den Sie ersetzen möchten, indem Sie die Methode projects.locations.jobs.get verwenden. Verwenden Sie den Abfrageparameter view mit dem Wert JOB_VIEW_DESCRIPTION. Wenn Sie JOB_VIEW_DESCRIPTION verwenden, wird die Datenmenge in der Antwort begrenzt, sodass Ihre nachfolgende Anfrage die Größenbeschränkungen nicht überschreitet. Wenn Sie detailliertere Jobinformationen benötigen, verwenden Sie den Wert JOB_VIEW_ALL.

    GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
    

    Ersetzen Sie die folgenden Werte:

    • PROJECT_ID: Die Google Cloud-Projekt-ID des Dataflow-Jobs
    • REGION: Die Region des Jobs, den Sie aktualisieren möchten
    • JOB_ID: Die Job-ID des Jobs, den Sie aktualisieren möchten
  2. Verwenden Sie zum Aktualisieren des Feldes die Methode projects.locations.jobs.create. Verwenden Sie im Anfragetext die abgerufene job-Ressource.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    {
      "id": JOB_ID,
      "replaceJobId": JOB_ID,
      "name": JOB_NAME,
      "type": "JOB_TYPE_STREAMING",
      "transformNameMapping": {
        string: string,
        ...
      },
    }
    

    Ersetzen Sie Folgendes:

    • JOB_ID: Die Job-ID des Jobs, den Sie aktualisieren möchten.
    • JOB_NAME: Der Jobname des Jobs, den Sie aktualisieren möchten.

    Sollten sich Transformationsnamen in Ihrer Pipeline geändert haben, müssen Sie eine Transformationszuordnung mit dem Feld transformNameMapping übergeben.

  3. Optional: Zum Senden der Anfrage mit curl (Linux, macOS oder Cloud Shell) speichern Sie die Anfrage in einer JSON-Datei und führen Sie dann den folgenden Befehl aus:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    

    Ersetzen Sie FILE_PATH durch den Pfad zur JSON-Datei, die den Anfragetext enthält.

Namen des Ersatzjobs angeben

Java

Wenn Sie den Ersatzjob starten, muss der Wert, den Sie für die Option --jobName übergeben, exakt mit dem Namen des zu ersetzenden Jobs übereinstimmen.

Python

Wenn Sie den Ersatzjob starten, muss der Wert, den Sie für die Option --job_name übergeben, exakt mit dem Namen des zu ersetzenden Jobs übereinstimmen.

Einfach loslegen (Go)

Wenn Sie den Ersatzjob starten, muss der Wert, den Sie für die Option --job_name übergeben, exakt mit dem Namen des zu ersetzenden Jobs übereinstimmen.

gcloud

Wenn Sie den Ersatzjob starten, muss JOB_NAME genau mit dem Namen des zu ersetzenden Jobs übereinstimmen.

REST

Legen Sie den Wert des Felds replaceJobId auf die Job-ID des zu aktualisierenden Jobs fest. Wählen Sie den vorigen Job in der Dataflow-Monitoring-Oberfläche aus, um den richtigen Jobnamen zu ermitteln. Suchen Sie dann in der Seitenleiste Jobinfo das Feld Job-ID.

Den richtigen Wert für den Jobnamen finden Sie, wenn Sie in der Dataflow-Monitoring-Oberfläche den vorherigen Job auswählen. Suchen Sie dann in der Seitenleiste Jobinfo das Feld Jobname:

Die Seitenleiste „Jobdetails“ für einen laufenden Dataflow-Job.
Abbildung 1: Die Seitenleiste „Jobdetails“ für einen laufenden Dataflow-Job mit dem Feld „Jobname“.

Alternativ können Sie die Liste der vorhandenen Jobs über die Dataflow-Befehlszeilenschnittstelle abfragen. Geben Sie den Befehl gcloud dataflow jobs list in Ihr Shell- oder Terminal-Fenster ein, um eine Liste der Dataflow-Jobs in Ihrem Google Cloud-Projekt zu erhalten, und suchen Sie nach dem NAME für den Job, den Sie ersetzen möchten:

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

Transformationszuordnung erstellen

Falls die Namen der Transformationen in der Ersatz-Pipeline von jenen in der vorigen Pipeline abweichen, benötigt der Dataflow-Dienst eine Transformationszuordnung. Die Transformationszuordnung dient dazu, die benannten Transformationen des vorigen Pipelinecodes den Namen im neuen Pipelinecode zuzuordnen.

Java

Übergeben Sie die Zuordnung mithilfe der Befehlszeilenoption --transformNameMapping im folgenden Format:

--transformNameMapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Beachten Sie, dass Sie in --transformNameMapping nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Hinweis: Bei der Ausführung mit --transformNameMapping müssen Anführungszeichen gemäß den Anforderungen Ihrer Shell gegebenenfalls mit Escapezeichen versehen werden. Zum Beispiel in Bash:

--transformNameMapping='{"oldTransform1":"newTransform1",...}'

Python

Übergeben Sie die Zuordnung mithilfe der Befehlszeilenoption --transform_name_mapping im folgenden Format:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Beachten Sie, dass Sie in --transform_name_mapping nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Hinweis: Bei der Ausführung mit --transform_name_mapping müssen Anführungszeichen gemäß den Anforderungen Ihrer Shell gegebenenfalls mit Escapezeichen versehen werden. Zum Beispiel in Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

Einfach loslegen (Go)

Übergeben Sie die Zuordnung mithilfe der Befehlszeilenoption --transform_name_mapping im folgenden Format:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Beachten Sie, dass Sie in --transform_name_mapping nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Hinweis: Bei der Ausführung mit --transform_name_mapping müssen Anführungszeichen gemäß den Anforderungen Ihrer Shell gegebenenfalls mit Escapezeichen versehen werden. Zum Beispiel in Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

gcloud

Übergeben Sie die Zuordnung mit der Option --transform-name-mappings im folgenden Format:

--transform-name-mappings= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Beachten Sie, dass Sie in --transform-name-mappings nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Hinweis: Bei der Ausführung mit --transform-name-mappings müssen Anführungszeichen gemäß den Anforderungen Ihrer Shell gegebenenfalls mit Escapezeichen versehen werden. Zum Beispiel in Bash:

--transform-name-mappings='{"oldTransform1":"newTransform1",...}'

REST

Übergeben Sie die Zuordnung mit dem Feld transformNameMapping im folgenden Format:

"transformNameMapping": {
  oldTransform1: newTransform1,
  oldTransform2: newTransform2,
  ...
}

Beachten Sie, dass Sie in transformNameMapping nur Zuordnungseinträge für Transformationsnamen angeben müssen, die zwischen der vorherigen Pipeline und der Ersatzpipeline geändert wurden.

Transformationsnamen bestimmen

Der Transformationsname in einer Instanz in der Zuordnung ist der Name, den Sie bei der Anwendung der Transformation in Ihrer Pipeline angegeben haben. Beispiel:

Java

  .apply("FormatResults", ParDo
    .of(new DoFn<KV<String, Long>>, String>() {
      ...
     }
  }))

Python

  | 'FormatResults' >> beam.ParDo(MyDoFn())

Einfach loslegen (Go)

  // In Go, this is always the package-qualified name of the DoFn itself.
  // For example, if the FormatResults DoFn is in the main package, its name
  // is "main.FormatResults".
  beam.ParDo(s, FormatResults, results)

Außerdem können Sie die Transformationsnamen des vorigen Jobs durch Untersuchung des Ausführungsdiagramms dieses Jobs in der Dataflow-Überwachungsoberfläche ermitteln:

Ausführungsgrafik für eine WordCount-Pipeline.
Abbildung 2: Die Ausführungsgrafik für eine WordCount-Pipeline, wie sie in der Dataflow Monitoring-Oberfläche angezeigt wird.

Benennung von zusammengesetzten Transformationen

Transformationsnamen sind in Abhängigkeit von der Hierarchie in der Pipeline hierarchisch aufgebaut. Wenn eine Pipeline über eine zusammengesetzte Transformation verfügt, werden die eingebetteten Transformationen mit Bezug auf die sie enthaltende Transformation benannt. Nehmen wir beispielsweise an, Ihre Pipeline enthält eine zusammengesetzte Transformation namens CountWidgets, die wiederum eine Transformation namens Parse enthält. Der vollständige Name der Transformation ist CountWidgets/Parse. Sie müssen den vollständigen Namen in der Transformationszuordnung angeben.

Wenn die neue Pipeline eine zusammengesetzte Transformation einem anderen Namen zuordnet, werden auch alle darin verschachtelten Transformationen automatisch umbenannt. Sie müssen die neuen Namen für die inneren Transformationen in der Transformationszuordnung angeben.

Transformationshierarchie refaktorieren

Wenn die Hierarchie der Ersatzpipeline von jener der vorigen Pipeline abweicht, müssen Sie die Zuordnung ausdrücklich deklarieren. Möglicherweise haben Sie eine andere Transformationshierarchie, da Sie Ihre zusammengesetzten Transformationen refaktoriert haben oder Ihre Pipeline von einer zusammengesetzten Transformation aus einer Bibliothek abhängt.

Beispiel: Die vorherige Pipeline hat die zusammengesetzte Transformation CountWidgets angewendet, die eine Transformation namens Parse enthielt. Die Ersatzpipeline refaktoriert CountWidgets und verschachtelt Parse in eine andere Transformation namens Scan. Damit die Aktualisierung gelingen kann, müssen Sie den vollständigen Transformationsnamen in der vorherigen Pipeline (CountWidgets/Parse) ausdrücklich dem Transformationsnamen in der neuen Pipeline (CountWidgets/Scan/Parse) zuordnen:

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

--transform_name_mapping={"CountWidgets/Parse":""}

Einfach loslegen (Go)

--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

--transform_name_mapping={"CountWidgets/main.Parse":""}

gcloud

--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

--transform-name-mappings={"CountWidgets/main.Parse":""}

REST

"transformNameMapping": {
  CountWidgets/Parse: CountWidgets/Scan/Parse
}

Wenn Sie eine Transformation ganz aus der Ersatzpipeline löschen, müssen Sie eine Null-Zuordnung angeben. Nehmen wir an, aus der Ersatzpipeline wird die Transformation CountWidgets/Parse vollkommen entfernt:

"transformNameMapping": {
  CountWidgets/main.Parse: null
}

Auswirkungen des Ersetzens eines Jobs

Wenn Sie einen vorhandenen Job ersetzen, wird der aktualisierte Pipelinecode durch einen neuen Job ausgeführt. Der Dataflow-Dienst behält den Jobnamen bei, führt den Ersatzjob aber mit einer aktualisierten Job-ID aus. Dieser Vorgang kann zu Ausfallzeiten führen, während der vorhandene Job angehalten wird, die Kompatibilitätsprüfung ausgeführt wird und der neue Job gestartet wird.

Der Ersatzjob behält folgende Elemente bei:

Zwischenzustandsdaten

Daten im Zwischenzustand aus dem vorherigen Job bleiben erhalten. Statusdaten enthalten keine speicherinternen Caches. Wenn Sie bei der Aktualisierung Ihrer Pipeline die Cache-Daten beibehalten möchten, refaktorieren Sie Ihre Pipeline so, dass Caches in Statusdaten oder nach Nebeneingaben umgewandelt werden. Weitere Informationen zur Verwendung von Nebeneingaben finden Sie in der Apache Beam-Dokumentation unter Nebeneingabemuster.

Streamingpipelines haben Größenbeschränkungen für ValueState und für Nebeneingaben. Wenn Sie große Caches haben, die Sie behalten möchten, müssen Sie möglicherweise externen Speicher wie Memorystore oder Bigtable verwenden.

In-Flight-Daten

In-Flight-Daten werden durch die Transformationen in der neuen Pipeline verarbeitet. Ob zusätzliche Transformationen, die Sie dem Ersatz-Pipelinecode hinzufügen, wirksam werden, hängt jedoch davon ab, wo die Datensätze gepuffert werden. In diesem Beispiel enthält Ihre vorhandene Pipeline die folgenden Transformationen:

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Einfach loslegen (Go)

   beam.ParDo(s, ReadStrings)
   beam.ParDo(s, FormatStrings)

Sie können Ihren Job folgendermaßen durch neuen Pipelinecode ersetzen:

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Einfach loslegen (Go)

  beam.ParDo(s, ReadStrings)
  beam.ParDo(s, RemoveStringsStartingWithA)
  beam.ParDo(s, FormatStrings)

Selbst wenn Sie eine Transformation zum Herausfiltern von Strings, die mit dem Buchstaben "A" beginnen, hinzugefügen, sieht die nächste Transformation (FormatStrings) möglicherweise trotzdem gepufferte oder in Übertragung befindliche (In-Flight-)Strings des vorigen Jobs, die mit "A" beginnen.

Windowing ändern

Sie können die Windowing- und Trigger-Strategien für die PCollection-Objekte in der Ersatzpipeline ändern. Dabei ist jedoch Vorsicht geboten. Eine Änderung der Windowing- oder Trigger-Strategien wirkt sich nicht auf bereits gepufferte oder anderweitig in Übertragung befindliche Daten aus.

Es empfiehlt sich, nur geringfügige Änderungen am Windowing der Pipeline vorzunehmen, z. B. eine Änderung der Dauer von festen oder fließenden Zeitfenstern. Durch größere Änderungen an Windowing- oder Trigger-Strategien, z. B. eine Änderung des Windowing-Algorithmus, kann die Pipelineausgabe unvorhersehbare Ergebnisse liefern.

Kompatibilitätsprüfung von Jobs

Wenn Sie Ihren Ersatzjob starten, führt der Dataflow-Dienst eine Prüfung der Kompatibilität zwischen dem Ersatzjob und dem vorigen Job durch. Bei Bestehen der Kompatibilitätsprüfung wird der vorige Job angehalten. Der Ersatzjob startet dann unter demselben Jobnamen im Dataflow-Dienst. Bei Nichtbestehen der Kompatibilitätsprüfung wird der vorige Job weiterhin im Dataflow-Dienst ausgeführt und der Ersatzjob gibt einen Fehler zurück.

Java

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  1. Verwenden Sie pipeline.run().waitUntilFinish() in Ihrem Pipelinecode.
  2. Führen Sie Ihr Ersatz-Pipelineprogramm mit der Option --update aus.
  3. Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich bestanden hat.
  4. Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

Alternativ können Sie den Status des Ersatzjobs in der Monitoring-Oberfläche von Dataflow überwachen. Wenn Ihr Job erfolgreich gestartet wurde, hat er auch die Kompatibilitätsprüfung bestanden.

Python

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  1. Verwenden Sie pipeline.run().wait_until_finish() in Ihrem Pipelinecode.
  2. Führen Sie Ihr Ersatz-Pipelineprogramm mit der Option --update aus.
  3. Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich bestanden hat.
  4. Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

Alternativ können Sie den Status des Ersatzjobs in der Monitoring-Oberfläche von Dataflow überwachen. Wenn Ihr Job erfolgreich gestartet wurde, hat er auch die Kompatibilitätsprüfung bestanden.

Einfach loslegen (Go)

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Insbesondere müssen Sie die nicht blockierende Ausführung mit dem Flag --execute_async oder --async einstellen. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  1. Führen Sie Ihr Ersatz-Pipelineprogramm mit der Option --update und ohne das Flag --execute_async oder --async aus.
  2. Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich besteht.
  3. Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

gcloud

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  1. Verwenden Sie bei Java-Pipelines pipeline.run().waitUntilFinish() in Ihrem Pipelinecode. Verwenden Sie bei Python-Pipelines pipeline.run().wait_until_finish() in Ihrem Pipelinecode. Führen Sie bei Go-Pipelines die Schritte auf dem Tab „Go“ aus.
  2. Führen Sie Ihr Ersatz-Pipelineprogramm mit der Option --update aus.
  3. Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich bestanden hat.
  4. Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

REST

Aufgrund einer Einschränkung müssen Sie die Blockierung der Ausführung verwenden, damit fehlgeschlagene Aktualisierungsfehler in der Konsole oder dem Terminal angezeigt werden. Mit folgenden Schritten kann das Problem derzeit umgangen werden:

  • Verwenden Sie bei Java-Pipelines pipeline.run().waitUntilFinish() in Ihrem Pipelinecode. Verwenden Sie bei Python-Pipelines pipeline.run().wait_until_finish() in Ihrem Pipelinecode. Führen Sie bei Go-Pipelines die Schritte auf dem Tab „Go“ aus.
  • Führen Sie Ihr Ersatz-Pipelineprogramm mit dem Feld replaceJobId aus.
  • Warten Sie, bis der Ersatzjob die Kompatibilitätsprüfung erfolgreich besteht.
  • Beenden Sie den Blocking-Ausführungsprozess durch Eingabe von Ctrl+C.

Durch die Kompatibilitätsprüfung wird gewährleistet, dass Dataflow die Zwischenzustandsdaten aus den Schritten des vorigen Jobs an den Ersatzjob übertragen kann. Außerdem sorgt die Kompatibilitätsprüfung dafür, dass die PCollection-Objekte in der Pipeline dieselben Coder verwenden. Wenn Sie einen Coder ändern, kann die Kompatibilitätsprüfung fehlschlagen, da etwaige In-Flight-Daten oder gepufferte Datensätze in der Ersatzpipeline möglicherweise nicht richtig serialisiert werden.

Kompatibilitätsbrüche verhindern

Bestimmte Unterschiede zwischen der vorigen und der Ersatzpipeline können zum Nichtbestehen der Kompatibilitätsprüfung führen. Zu diesen Unterschieden gehören:

  • Änderung des Pipelinediagramms ohne Angabe einer Zuordnung: Wenn Sie einen Job aktualisieren, wird von Dataflow versucht, die Transformationen aus dem vorigen und dem Ersatzjob einander zuzuordnen. Dieser Abgleichsprozess hilft Dataflow, für jeden Schritt Daten in einem Zwischenzustand zu übertragen. Wenn Sie Schritte umbenennen oder entfernen, müssen Sie eine Transformationszuordnung bereitstellen, damit Dataflow Zustandsdaten entsprechend zuordnen kann.
  • Änderung der Nebeneingaben für einen Schritt: Wenn Sie einer Transformation in der Ersatzpipeline Nebeneingaben hinzufügen oder sie aus ihr entfernen, schlägt die Kompatibilitätsprüfung fehl.
  • Änderung des Codes für einen Schritt: Wenn Sie einen Job aktualisieren, werden etwaige zwischengespeicherte Datensätze in Dataflow beibehalten und im Ersatzjob verarbeitet. Zwischengespeicherte Daten können beispielsweise auftreten, während Windowing aufgelöst wird. Wenn der Ersatzjob eine andere oder eine inkompatible Datencodierung verwendet, können diese Datensätze von Dataflow nicht serialisiert oder deserialisiert werden.
  • "Zustandsorientierten" Vorgang aus der Pipeline entfernen. Wenn Sie zustandsorientierte Vorgänge aus der Pipeline entfernen, schlägt der Ersatzjob möglicherweise die Kompatibilitätsprüfung fehl. Dataflow kann mehrere Effizienzen zusammenfassen. Wenn eine zustandsabhängige Operation aus einem zusammengefassten Schritt fehlt, schlägt die Prüfung fehl. Bei "stateful"-Operationen handelt es sich um:

    • Transformationen, die Nebeneingaben erzeugen oder verbrauchen.
    • E/A-Lesevorgänge
    • Transformationen mit verschlüsselten Zuständen
    • Transformationen mit Fensterzusammenführung
  • Zustandsorientierte DoFn-Variablen ändern. Wenn bei Streaming-Jobs Ihre Pipeline zustandsorientierte DoFns enthält, kann das Ändern der zustandsorientierten DoFn-Variablen dazu führen, dass die Pipeline fehlschlägt.

  • Es wird versucht, den Ersatzjob in einer anderen geografischen Zone auszuführen: Führen Sie den Ersatzjob in derselben Zone aus, in der Sie den vorherigen Job ausgeführt haben.

Schemas aktualisieren

Apache Beam lässt PCollection zu Schemas mit benannten Feldern zu. In diesem Fall sind keine expliziten Coder erforderlich. Wenn die Feldnamen und -typen für ein bestimmtes Schema unverändert bleiben (einschließlich verschachtelter Felder), führt dieses Schema nicht dazu, dass die Aktualisierungsprüfung fehlschlägt. Die Aktualisierung kann jedoch weiterhin blockiert werden, wenn andere Segmente der neuen Pipeline nicht kompatibel sind.

Schemas entwickeln

Häufig ist es erforderlich, das Schema einer PCollection aufgrund von sich entwickelnden Geschäftsanforderungen zu entwickeln. Der Dataflow-Dienst ermöglicht beim Aktualisieren der Pipeline die folgenden Änderungen an einem Schema:

  • Ein oder mehrere neue Felder zu einem Schema hinzufügen, einschließlich verschachtelter Felder.
  • Obligatorischer Feldtyp (keine Nullwerte zulässig), der optional sein kann (Nullwerte zulässig).

Das Entfernen von Feldern, Ändern von Feldnamen oder Ändern von Feldtypen ist während der Aktualisierung nicht zulässig.

Zusätzliche Daten an einen vorhandenen ParDo-Vorgang übergeben

Sie können je nach Anwendungsfall zusätzliche (Out-of-Band-)Daten an einen vorhandenen ParDo-Vorgang übergeben. Verwenden Sie dazu je nach Anwendungsfall eine der folgenden Methoden:

  • Serialisieren Sie Informationen als Felder in Ihrer DoFn-Unterklasse.
  • Alle Variablen, auf die in den Methoden in einem anonymen DoFn verwiesen wird, werden automatisch serialisiert.
  • Daten innerhalb eines DoFn.startBundle() berechnen.
  • Übergeben Sie Daten mit ParDo.withSideInputs.

Weitere Informationen finden Sie auf den folgenden Seiten: