Pipelineabhängigkeiten in Dataflow verwalten

Viele Apache Beam-Pipelines können unter Verwendung der standardmäßigen Dataflow-Laufzeitumgebungen ausgeführt werden. Einige Anwendungsfälle für die Datenverarbeitung profitieren jedoch von zusätzlichen Bibliotheken oder Klassen. In diesen Fällen müssen Sie möglicherweise Ihre Pipelineabhängigkeiten verwalten.

Im Folgenden finden Sie einige Gründe, warum Sie Ihre Pipelineabhängigkeiten möglicherweise verwalten müssen:

  • Die von der Standardlaufzeitumgebung bereitgestellten Abhängigkeiten sind für Ihren Anwendungsfall nicht ausreichend.
  • Die Standardabhängigkeiten haben entweder Versionskollisionen oder Klassen und Bibliotheken, die mit Ihrem Pipelinecode nicht kompatibel sind.
  • Sie müssen bestimmte Bibliotheksversionen für Ihre Pipeline anheften.
  • Sie haben eine Python-Pipeline, die mit einem konsistenten Satz von Abhängigkeiten ausgeführt werden muss.

Wie Sie Abhängigkeiten verwalten, hängt davon ab, ob Ihre Pipeline Java, Python oder Go verwendet.

Java

Inkompatible Klassen und Bibliotheken können zu Problemen mit der Java-Abhängigkeit führen. Wenn Ihre Pipeline nutzerspezifischen Code und Einstellungen enthält, darf der Code keine gemischten Bibliothekenversionen enthalten.

Probleme mit Java-Abhängigkeiten

Wenn Ihre Pipeline Probleme mit Java-Abhängigkeiten hat, kann einer der folgenden Fehler auftreten:

  • NoClassDefFoundError: Dieser Fehler tritt auf, wenn eine gesamte Klasse während der Laufzeit nicht verfügbar ist.
  • NoSuchMethodError: Dieser Fehler tritt auf, wenn die Klasse im Klassenpfad eine Version verwendet, die nicht die richtige Methode enthält, oder wenn sich die Methodensignatur geändert hat.
  • NoSuchFieldError: Dieser Fehler tritt auf, wenn die Klasse im Klassenpfad eine Version verwendet, für die während der Laufzeit kein Feld erforderlich ist.
  • FATAL ERROR: Dieser Fehler tritt auf, wenn eine integrierte Abhängigkeit nicht ordnungsgemäß geladen werden kann. Wenn Sie eine Uber-JAR-Datei (Shading) verwenden, nehmen Sie keine Bibliotheken auf, die Signaturen in derselben JAR-Datei verwenden, z. B. Conscrypt.

Abhängigkeitsverwaltung

Zur Vereinfachung der Abhängigkeitsverwaltung für Java-Pipelines verwendet Apache Beam Bill of Materials (BOM)-Artefakte. Die BOM unterstützt die Tools für die Abhängigkeitsverwaltung bei der Auswahl kompatibler Abhängigkeitskombinationen. Weitere Informationen finden Sie unter Apache Beam SDK für Java-Abhängigkeiten in der Apache Beam-Dokumentation.

Wenn Sie eine BOM mit Ihrer Pipeline verwenden und der Abhängigkeitsliste explizit andere Abhängigkeiten hinzufügen möchten, fügen Sie der Datei pom.xml für das SDK-Artefakt die folgenden Informationen hinzu: Verwenden Sie beam-sdks-java-io-google-cloud-platform-bom, um die richtige BOM der Bibliotheken zu importieren.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-google-cloud-platform-bom</artifactId>
      <version>LATEST</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  </dependency>
</dependencies>

Das Artefakt beam-sdks-java-core enthält nur das Core SDK. Sie müssen der Liste der Abhängigkeiten explizit weitere Abhängigkeiten wie E/A und Runner hinzufügen.

Python

Wenn Sie Dataflow-Jobs mit dem Apache Beam Python SDK ausführen, ist die Abhängigkeitsverwaltung in den folgenden Szenarien nützlich:

  • Ihre Pipeline verwendet öffentliche Pakete aus dem Python-Paketindex (PiPy) und Sie möchten diese Pakete remote verfügbar machen.
  • Sie möchten eine reproduzierbare Umgebung erstellen.
  • Um die Startzeit zu verkürzen, sollten Sie die Abhängigkeitsinstallation von den Workern zur Laufzeit vermeiden.

Python-Pipelineabhängigkeiten definieren

Obwohl Sie ein einziges Python-Skript oder -Notebook verwenden können, um eine Apache Beam-Pipeline zu schreiben, wird Software in der Python-Umgebung häufig als Pakete verteilt. Um die Wartung Ihrer Pipeline zu vereinfachen, gruppieren Sie die Pipeline-Dateien als Python-Paket, wenn Ihr Pipeline-Code mehrere Dateien umfasst.

  • Definieren Sie die Abhängigkeiten der Pipeline in der Datei setup.py Ihres Pakets.
  • Stellen Sie das Paket mit der Pipelineoption --setup_file auf den Workern bereit.

Wenn die Remote-Worker gestartet werden, installieren sie Ihr Paket. Ein Beispiel finden Sie unter Juliset im Apache Beam GitHub.

So strukturieren Sie Ihre Pipeline als Python-Paket:

  1. Erstellen Sie eine setup.py-Datei für Ihr Projekt. Fügen Sie in die Datei setup.py das Argument install_requires ein, um den minimalen Satz von Abhängigkeiten für Ihre Pipeline anzugeben. Das folgende Beispiel zeigt eine einfache setup.py-Datei.

    import setuptools
    
    setuptools.setup(
      name='PACKAGE_NAME',
      version='PACKAGE_VERSION',
      install_requires=[],
      packages=setuptools.find_packages(),
    )
    
  2. Fügen Sie dem Stammverzeichnis Ihres Projekts die Datei setup.py, die Hauptworkflowdatei und ein Verzeichnis mit den restlichen Dateien hinzu. Diese Dateigruppierung ist das Python-Paket für Ihre Pipeline. Die Dateistruktur sieht so aus wie das folgende Beispiel:

    root_dir/
      package_name/
        my_pipeline_launcher.py
        my_custom_transforms.py
        ...other files...
      setup.py
      main.py
    
  3. Installieren Sie das Paket in der Übermittlungsumgebung, um Ihre Pipeline auszuführen. Verwenden Sie die Pipelineoption --setup_file, um das Paket auf den Workern bereitzustellen. Beispiel:

    python -m pip install -e .
    python main.py --runner DataflowRunner --setup_file ./setup.py  <...other options...>
    

Diese Schritte vereinfachen die Wartung des Pipelinecodes, insbesondere wenn der Code größer und komplexer wird. Weitere Möglichkeiten zum Angeben von Abhängigkeiten finden Sie unter Python-Pipelineabhängigkeiten verwalten in der Apache Beam-Dokumentation.

Laufzeitumgebung mit benutzerdefinierten Containern steuern

Zum Ausführen einer Pipeline mit dem Apache Beam Python SDK benötigen Dataflow-Worker eine Python-Umgebung, die einen Interpreter, das Apache Beam SDK und die Pipelineabhängigkeiten enthält. Docker-Container-Images bieten die geeignete Umgebung zum Ausführen Ihres Pipelinecodes.

Standard-Container-Images werden mit jeder Version des Apache Beam SDK veröffentlicht und diese Images enthalten die Apache Beam SDK-Abhängigkeiten. Weitere Informationen finden Sie unter Apache Beam SDK für Python-Abhängigkeiten in der Apache Beam-Dokumentation.

Wenn Ihre Pipeline eine Abhängigkeit erfordert, die nicht im Standard-Container-Image enthalten ist, muss die Abhängigkeit zur Laufzeit installiert werden. Die Installation von Paketen zur Laufzeit kann folgende Konsequenzen haben:

  • Die Startzeit der Worker erhöht sich aufgrund der Auflösung, des Downloads und der Installation von Abhängigkeiten.
  • Für die Ausführung der Pipeline ist eine Internetverbindung erforderlich.
  • Nicht-Determinismus tritt aufgrund von Softwarereleases in Abhängigkeiten auf.

Stellen Sie die Laufzeitumgebung in einem benutzerdefinierten Docker-Container-Image bereit, um diese Probleme zu vermeiden. Die Verwendung eines benutzerdefinierten Docker-Container-Images, auf dem die Pipelineabhängigkeiten vorinstalliert sind, bietet die folgenden Vorteile:

  • Stellt sicher, dass die Pipeline-Laufzeitumgebung bei jedem Start Ihres Dataflow-Jobs die gleichen Abhängigkeiten hat.
  • Ermöglicht die Steuerung der Laufzeitumgebung Ihrer Pipeline.
  • Vermeidet die potenziell zeitaufwendige Auflösung von Abhängigkeiten beim Start.

Beachten Sie bei der Verwendung benutzerdefinierter Container-Images die folgenden Hinweise:

  • Verwenden Sie nicht das Tag :latest für Ihre benutzerdefinierten Images. Taggen Sie Ihre Builds mit einem Datum, einer Version oder einer eindeutigen Kennzeichnung. Mit diesem Schritt können Sie bei Bedarf zu einer als funktionierend bekannten Konfiguration zurückkehren.
  • Verwenden Sie eine Startumgebung, die mit Ihrem Container-Image kompatibel ist. Weitere Informationen zur Verwendung von benutzerdefinierten Containern finden Sie unter Container-Image erstellen.

Weitere Informationen zum Vorinstallieren von Python-Abhängigkeiten finden Sie unter Python-Abhängigkeiten vorinstallieren.

Startumgebung mit Dataflow-Vorlagen steuern

Wenn Ihre Pipeline zusätzliche Abhängigkeiten erfordert, müssen Sie diese möglicherweise sowohl in der Laufzeitumgebung als auch in der Startumgebung installieren. In der Startumgebung wird die Produktionsversion der Pipeline ausgeführt. Da die Startumgebung mit der Laufzeitumgebung kompatibel sein muss, verwenden Sie in beiden Umgebungen dieselben Versionen von Abhängigkeiten.

Verwenden Sie flexible Dataflow-Vorlagen, um eine containerisierte, reproduzierbare Startumgebung zu haben. Weitere Informationen finden Sie unter Flexible Vorlage erstellen und ausführen. Berücksichtigen Sie bei der Verwendung von flexiblen Vorlagen die folgenden Faktoren:

Diese Konstruktion macht die Startumgebung sowohl reproduzierbar als auch mit Ihrer Laufzeitumgebung kompatibel.

Ein Beispiel für diesen Ansatz finden Sie in der Anleitung Flex-Vorlage für eine Pipeline mit Abhängigkeiten und einem benutzerdefinierten Container in GitHub.

Weitere Informationen finden Sie unter Startumgebung mit der Laufzeitumgebung kompatibel machen und Von der Pipeline verwendete Abhängigkeiten steuern in der Apache-Datei Beam-Dokumentation.

Einfach loslegen (Go)

Wenn Sie Dataflow-Jobs mit dem Apache Beam Go SDK ausführen, werden Go-Module zum Verwalten von Abhängigkeiten verwendet. Die folgende Datei enthält die Standardkompilierungs- und Laufzeitabhängigkeiten, die von Ihrer Pipeline verwendet werden:

https://raw.githubusercontent.com/apache/beam/vVERSION_NUMBER/sdks/go.sum

Ersetzen Sie VERSION_NUMBER durch die von Ihnen verwendete SDK-Version.

Informationen zum Verwalten von Abhängigkeiten für Ihre Go-Pipeline finden Sie unter Abhängigkeiten verwalten in der Go-Dokumentation.