Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird beschrieben, wie Sie mit Cloud Composer 2 Dataproc-Serverless-Nutzlasten aufGoogle Cloudausführen.
In den folgenden Abschnitten finden Sie Beispiele für die Verwendung von Operatoren zum Verwalten von Dataproc Serverless-Batcharbeitslasten. Sie verwenden diese Operatoren in DAGs, mit denen eine serverlose Dataproc-Spark-Batcharbeitslast erstellt, gelöscht, aufgelistet und abgerufen wird:
Erstellen Sie DAGs für Operatoren, die mit serverlosen Dataproc-Batch-Arbeitslasten funktionieren:
DAGs erstellen, die benutzerdefinierte Container und Dataproc Metastore verwenden
Konfigurieren Sie den Persistent History Server für diese DAGs.
Hinweis
Aktivieren Sie die Dataproc API:
Console
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Wählen Sie den Speicherort für die Batcharbeitslastdatei aus. Sie haben folgende Möglichkeiten:
- Erstellen Sie einen Cloud Storage-Bucket, in dem diese Datei gespeichert wird.
- Verwenden Sie den Bucket Ihrer Umgebung. Da Sie diese Datei nicht mit Airflow synchronisieren müssen, können Sie einen separaten Unterordner außerhalb der Ordner
/dags
oder/data
erstellen. Beispiel:/batches
- Verwenden Sie einen vorhandenen Bucket.
Dateien und Airflow-Variablen einrichten
In diesem Abschnitt wird gezeigt, wie Sie Dateien einrichten und Airflow-Variablen für diese Anleitung konfigurieren.
Dataproc Serverless Spark ML-Arbeitslastdatei in einen Bucket hochladen
In der Arbeitslast in dieser Anleitung wird ein PySpark-Script ausgeführt:
Speichern Sie ein PySpark-Script in einer lokalen Datei mit dem Namen
spark-job.py
. Sie können beispielsweise das Beispiel-PySpark-Script verwenden.Laden Sie die Datei an den Speicherort hoch, den Sie unter Vorab ausgewählt haben.
Airflow-Variablen festlegen
In den folgenden Abschnitten werden Airflow-Variablen verwendet. Sie legen Werte für diese Variablen in Airflow fest, damit Ihr DAG-Code auf diese Werte zugreifen kann.
In den Beispielen in dieser Anleitung werden die folgenden Airflow-Variablen verwendet. Sie können sie je nach Beispiel nach Bedarf festlegen.
Legen Sie die folgenden Airflow-Variablen für die Verwendung in Ihrem DAG-Code fest:
project_id
: Projekt-ID.bucket_name
: URI eines Buckets, in dem sich die Haupt-Python-Datei der Arbeitslast (spark-job.py
) befindet. Sie haben diesen Speicherort unter Vorbereitung ausgewählt.phs_cluster
: Name des Clusters des Persistent History-Servers. Sie legen diese Variable fest, wenn Sie einen Persistent History Server erstellen.image_name
: Name und Tag des benutzerdefinierten Container-Images (image:tag
). Sie legen diese Variable fest, wenn Sie ein benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden.metastore_cluster
: Name des Dataproc Metastore-Dienstes. Sie legen diese Variable fest, wenn Sie den Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden.region_name
: Region, in der sich der Dataproc Metastore-Dienst befindet. Sie legen diese Variable fest, wenn Sie den Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden.
Jede Airflow-Variable über die Google Cloud Console und die Airflow-Benutzeroberfläche festlegen
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Link Airflow für Ihre Umgebung. Die Airflow-Benutzeroberfläche wird geöffnet.
Wählen Sie in der Airflow-Benutzeroberfläche Verwaltung > Variablen aus.
Klicken Sie auf Neuen Eintrag hinzufügen.
Geben Sie den Namen der Variablen im Feld Key und den Wert für sie im Feld Val an.
Klicken Sie auf Speichern.
Persistent History Server erstellen
Verwenden Sie einen Persistent History Server (PHS), um Spark-Verlaufsdateien Ihrer Batch-Arbeitslasten aufzurufen:
- Erstellen Sie einen Persistent History Server.
- Achten Sie darauf, dass Sie den Namen des PHS-Clusters in der
phs_cluster
Airflow-Variable angegeben haben.
DataprocCreateBatchOperator
Der folgende DAG startet eine serverlose Dataproc-Batcharbeitslast.
Weitere Informationen zu DataprocCreateBatchOperator
-Argumenten finden Sie im Quellcode des Operators.
Weitere Informationen zu Attributen, die du im Parameter batch
von DataprocCreateBatchOperator
übergeben kannst, findest du in der Beschreibung der Batch-Klasse.
Benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden
Das folgende Beispiel zeigt, wie Sie Ihre Arbeitslasten mit einem benutzerdefinierten Container-Image ausführen. Sie können einen benutzerdefinierten Container beispielsweise verwenden, um Python-Abhängigkeiten hinzuzufügen, die nicht vom Standard-Container-Image bereitgestellt werden.
So verwenden Sie ein benutzerdefiniertes Container-Image:
Erstellen Sie ein benutzerdefiniertes Container-Image und laden Sie es in Container Registry hoch.
Geben Sie das Bild in der
image_name
Airflow-Variablen an.Verwenden Sie DataprocCreateBatchOperator mit Ihrem benutzerdefinierten Image:
Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden
So verwenden Sie einen Dataproc Metastore-Dienst über einen DAG:
Prüfen Sie, ob Ihr Metadatenspeicherdienst bereits gestartet wurde.
Informationen zum Starten eines Metastore-Dienstes finden Sie unter Dataproc Metastore aktivieren und deaktivieren.
Ausführliche Informationen zum Batch-Operator zum Erstellen der Konfiguration finden Sie unter PeripheralsConfig.
Geben Sie den Namen des Metastore-Dienstes in der Variablen
metastore_cluster
und die Region in derregion_name
-Airflow-Variable an, sobald er betriebsbereit ist.Metastore-Dienst in DataprocCreateBatchOperator verwenden:
DataprocDeleteBatchOperator
Mit DataprocDeleteBatchOperator können Sie einen Batch anhand der Batch-ID der Arbeitslast löschen.
DataprocListBatchesOperator
Mit DataprocDeleteBatchOperator werden Batches aufgelistet, die in einer bestimmten project_id und Region vorhanden sind.
DataprocGetBatchOperator
Mit DataprocGetBatchOperator wird eine bestimmte Batcharbeitslast abgerufen.