KubernetesPodOperator verwenden.

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird beschrieben, wie Sie mit dem KubernetesPodOperator Kubernetes-Pods von Cloud Composer im Google Kubernetes Engine-Cluster bereitstellen, der Teil Ihrer Cloud Composer-Umgebung ist.

KubernetesPodOperator startet Kubernetes-Pods im Cluster Ihrer Umgebung. Mit Google Kubernetes Engine-Operatoren werden dagegen Kubernetes-Pods in einem bestimmten Cluster ausgeführt. Dabei kann es sich um einen separaten Cluster handeln, der nicht mit Ihrer Umgebung in Verbindung steht. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.

KubernetesPodOperator ist eine gute Option, wenn Sie Folgendes benötigen:

  • Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
  • Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.

Hinweise

Prüfen Sie die folgende Liste der Unterschiede zwischen KubernetesPodOperator in Cloud Composer 3 und Cloud Composer 2 und sorgen Sie dafür, dass Ihre DAGs kompatibel sind:

  • In Cloud Composer 3 ist es nicht möglich, benutzerdefinierte Namespaces zu erstellen. Pods werden immer im Namespace composer-user-workloads ausgeführt, auch wenn ein anderer Namespace angegeben ist. Pods in diesem Namespace haben ohne zusätzliche Konfiguration Zugriff auf die Ressourcen und das VPC-Netzwerk Ihres Projekts (sofern aktiviert).

  • In Cloud Composer 3 ist es nicht möglich, mehrere zusätzliche Sidecar-Container auszuführen. Sie können einen einzelnen Sidecar-Container ausführen, wenn er den Namen airflow-xcom-sidecar hat.

  • Kubernetes-Secrets und -ConfigMaps können nicht mit der Kubernetes API erstellt werden. Stattdessen bietet Cloud Composer Google Cloud CLI-Befehle, Terraform-Ressourcen und die Cloud Composer API zum Verwalten von Kubernetes-Secrets und ConfigMaps. Weitere Informationen finden Sie unter Kubernetes-Secrets und ConfigMaps verwenden.

  • Es ist nicht möglich, benutzerdefinierte Arbeitslasten in Cloud Composer 3 bereitzustellen. Nur Kubernetes-Secrets und ConfigMaps können geändert werden. Alle anderen Konfigurationsänderungen sind nicht möglich.

  • Ressourcenanforderungen (CPU, Arbeitsspeicher und Speicher) müssen mit unterstützten Werten angegeben werden.

  • Wie in Cloud Composer 2 ist die Pod-Affinitätskonfiguration nicht verfügbar. Wenn Sie die Pod-Affinität verwenden möchten, verwenden Sie stattdessen die GKE-Operatoren, um Pods in einem anderen Cluster zu starten.

KubernetesPodOperator in Cloud Composer 3

In diesem Abschnitt wird beschrieben, wie KubernetesPodOperator in Cloud Composer 3 funktioniert.

Ressourcennutzung

In Cloud Composer 3 wird der Cluster Ihrer Umgebung automatisch skaliert. Zusätzliche Arbeitslasten, die Sie mit KubernetesPodOperator ausführen, werden unabhängig von Ihrer Umgebung skaliert. Ihre Umgebung ist nicht von der erhöhten Ressourcennachfrage betroffen, aber der Cluster Ihrer Umgebung wird je nach Ressourcenbedarf skaliert.

Die Preise für zusätzliche Arbeitslasten, die Sie im Cluster Ihrer Umgebung ausführen, richten sich nach dem Cloud Composer 3-Preismodell und verwenden Cloud Composer 3-Artikelnummern.

Cloud Composer 3 verwendet Autopilot-Cluster, in denen das Konzept von Compute-Klassen eingeführt wird:

  • Cloud Composer unterstützt nur die Compute-Klasse general-purpose.

  • Wenn keine Klasse ausgewählt ist, wird standardmäßig die Klasse general-purpose verwendet, wenn Sie Pods mit KubernetesPodOperator erstellen.

  • Jede Klasse ist mit bestimmten Eigenschaften und Ressourcenlimits verknüpft. Weitere Informationen finden Sie in der Autopilot-Dokumentation. Pods, die in der Klasse general-purpose ausgeführt werden, können beispielsweise bis zu 110 GiB Arbeitsspeicher verwenden.

Zugriff auf die Ressourcen des Projekts

In Cloud Composer 3 befindet sich der Cluster Ihrer Umgebung im Mandantenprojekt und kann nicht konfiguriert werden. Pods werden im Cluster der Umgebung in einem isolierten Namespace ausgeführt.

In Cloud Composer 3 werden Pods immer im Namespace composer-user-workloads ausgeführt, auch wenn ein anderer Namespace angegeben ist. Pods in diesem Namespace können ohne zusätzliche Konfiguration auf Google CloudRessourcen in Ihrem Projekt und Ihrem VPC-Netzwerk (falls aktiviert) zugreifen. Für den Zugriff auf diese Ressourcen wird das Dienstkonto Ihrer Umgebung verwendet. Es ist nicht möglich, ein anderes Dienstkonto anzugeben.

Minimalkonfiguration

Zum Erstellen eines KubernetesPodOperator sind nur die Parameter name, image und task_id des Pods erforderlich. Die Datei /home/airflow/composer_kube_config enthält Anmeldedaten für die Authentifizierung bei GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Zusätzliche Konfiguration

In diesem Beispiel werden zusätzliche Parameter gezeigt, die Sie im KubernetesPodOperator konfigurieren können.

Weitere Informationen finden Sie in den folgenden Ressourcen:

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Jinja-Vorlagen verwenden

Airflow unterstützt Jinja-Vorlagen in DAGs.

Sie müssen die erforderlichen Airflow-Parameter (task_id, name und image) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds, arguments, env_vars und config_file.

Der Parameter env_vars im Beispiel wird aus einer Airflow-Variablen mit dem Namen my_value festgelegt. Der Beispiel-DAG ruft seinen Wert aus der Vorlagenvariablen vars in Airflow ab. Airflow hat mehr Variablen, die Zugriff auf verschiedene Arten von Informationen bieten. Sie können beispielsweise mit der Vorlagenvariablen conf auf Werte von Airflow-Konfigurationsoptionen zugreifen. Weitere Informationen und eine Liste der in Airflow verfügbaren Variablen finden Sie in der Airflow-Dokumentation unter Templates reference.

Wenn der DAG nicht geändert und die Variable env_vars nicht erstellt wird, schlägt die Aufgabe ex-kube-templates im Beispiel fehl, da die Variable nicht vorhanden ist. Erstellen Sie diese Variable in der Airflow-Benutzeroberfläche oder mit der Google Cloud CLI:

Airflow-UI

  1. Rufen Sie die Airflow-UI auf.

  2. Wählen Sie in der Symbolleiste Admin > Variablen aus.

  3. Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.

  4. Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:

    • Key: my_value
    • Val: example_value
  5. Klicken Sie auf Speichern.

gcloud

Geben Sie den folgenden Befehl ein:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Ersetzen Sie:

  • ENVIRONMENT durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.

Das folgende Beispiel zeigt, wie Sie Jinja-Vorlagen mit KubernetesPodOperator verwenden:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes-Secrets und -ConfigMaps verwenden

Ein Kubernetes-Secret ist ein Objekt, das sensible Daten enthält. Eine Kubernetes-ConfigMap ist ein Objekt, das nicht vertrauliche Daten in Schlüssel/Wert-Paaren enthält.

In Cloud Composer 3 können Sie Secrets und ConfigMaps mit der Google Cloud CLI, der API oder Terraform erstellen und dann über KubernetesPodOperator darauf zugreifen:

  • Bei der Google Cloud CLI und API stellen Sie eine YAML-Konfigurationsdatei bereit.
  • Mit Terraform definieren Sie Secrets und ConfigMaps als separate Ressourcen in Terraform-Konfigurationsdateien.

YAML-Konfigurationsdateien

Wenn Sie ein Kubernetes-Secret oder eine ConfigMap mit der Google Cloud CLI und der API erstellen, geben Sie eine Datei im YAML-Format an. Diese Datei muss dasselbe Format wie Kubernetes-Secrets und ConfigMaps haben. In der Kubernetes-Dokumentation finden Sie viele Codebeispiele für ConfigMaps und Secrets. Weitere Informationen finden Sie auf der Seite Anmeldedaten mit Secrets sicher verteilen und unter ConfigMaps.

Wie bei Kubernetes-Secrets müssen Sie die Base64-Darstellung verwenden, wenn Sie Werte in Secrets definieren.

Sie können einen Wert mit dem folgenden Befehl codieren (dies ist nur eine von vielen Möglichkeiten, einen base64-codierten Wert zu erhalten):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Ausgabe:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Die folgenden beiden YAML-Dateibeispiele werden später in diesem Leitfaden verwendet. Beispiel für eine YAML-Konfigurationsdatei für ein Kubernetes-Secret:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Ein weiteres Beispiel, das zeigt, wie Dateien eingebunden werden. Wie im vorherigen Beispiel codieren Sie zuerst den Inhalt einer Datei (cat ./key.json | base64) und geben diesen Wert dann in der YAML-Datei an:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Beispiel für eine YAML-Konfigurationsdatei für eine ConfigMap. Sie müssen die base64-Darstellung nicht in ConfigMaps verwenden:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes-Secrets verwalten

gcloud

Secret erstellen

Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret zu erstellen:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • SECRET_FILE: Pfad zu einer lokalen YAML-Datei, die die Konfiguration des Secrets enthält.

Beispiel:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Secret aktualisieren

Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret zu aktualisieren. Der Name des Secrets wird aus der angegebenen YAML-Datei übernommen und der Inhalt des Secrets wird ersetzt.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • SECRET_FILE: Pfad zu einer lokalen YAML-Datei, die die Konfiguration des Secrets enthält. Geben Sie in dieser Datei den Namen des Secrets im Feld metadata > name an.

Secrets auflisten

Führen Sie den folgenden Befehl aus, um eine Liste der Secrets und ihrer Felder für eine Umgebung abzurufen. Schlüsselwerte in der Ausgabe werden durch Sternchen ersetzt.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.

Secret-Details abrufen

Führen Sie den folgenden Befehl aus, um detaillierte Informationen zu einem Secret zu erhalten. Schlüsselwerte in der Ausgabe werden durch Sternchen ersetzt.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Ersetzen Sie Folgendes:

  • SECRET_NAME: Der Name des Secrets, wie er im Feld metadata > name in der YAML-Datei mit der Konfiguration des Secrets definiert wurde.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.

Secret löschen

Führen Sie den folgenden Befehl aus, um ein Secret zu löschen:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: Der Name des Secrets, wie er im Feld metadata > name in der YAML-Datei mit der Konfiguration des Secrets definiert wurde.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.

API

Secret erstellen

  1. Erstellen Sie eine environments.userWorkloadsSecrets.create-API-Anfrage.

  2. In dieser Anfrage:

    1. Geben Sie im Anfragetext im Feld name den URI für das neue Secret an.
    2. Geben Sie im Anfragetext im Feld data Schlüssel und Base64-codierte Werte für das Secret an.

Beispiel:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Secret aktualisieren

  1. Erstellen Sie eine environments.userWorkloadsSecrets.update-API-Anfrage.

  2. In dieser Anfrage:

    1. Geben Sie im Anfragetext im Feld name den URI des Secret an.
    2. Geben Sie im Anfragetext im Feld data Schlüssel und Base64-codierte Werte für das Secret an. Die Werte werden ersetzt.

Beispiel:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

Secrets auflisten

Erstellen Sie eine environments.userWorkloadsSecrets.list-API-Anfrage. Schlüsselwerte in der Ausgabe werden durch Sternchen ersetzt. Für diese Anfrage ist die Paginierung möglich. Weitere Informationen finden Sie in der Referenz der Anfrage.

Beispiel:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Secret-Details abrufen

Erstellen Sie eine environments.userWorkloadsSecrets.get-API-Anfrage. Schlüsselwerte in der Ausgabe werden durch Sternchen ersetzt.

Beispiel:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Secret löschen

Erstellen Sie eine environments.userWorkloadsSecrets.delete-API-Anfrage.

Beispiel:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

Die google_composer_user_workloads_secret-Ressource definiert ein Kubernetes-Secret mit Schlüsseln und Werten, die im data-Block definiert sind.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Der Name der Ressourcen der Umgebung, die die Definition der Umgebung in Terraform enthält. Der Name der tatsächlichen Umgebung wird ebenfalls in dieser Ressource angegeben.
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • SECRET_NAME: der Name des Secrets.
  • KEY_NAME: mindestens ein Schlüssel für dieses Secret.
  • KEY_VALUE: base64-codierter Wert für den Schlüssel. Mit der Funktion base64encode können Sie den Wert codieren (siehe Beispiel).

Die folgenden beiden Beispiele für Kubernetes Secrets werden später in diesem Leitfaden verwendet.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Ein weiteres Beispiel, das zeigt, wie Dateien eingebunden werden. Mit der Funktion file können Sie den Inhalt der Datei als String lesen und dann base64-codieren:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Kubernetes-Secrets in Ihren DAGs verwenden

Dieses Beispiel zeigt zwei Möglichkeiten für die Verwendung von Kubernetes Secrets: als Umgebungsvariable und als vom Pod bereitgestelltes Volume.

Das erste Secret, airflow-secrets, ist auf eine Kubernetes-Umgebungsvariable namens SQL_CONN festgelegt (nicht auf eine Airflow- oder Cloud Composer-Umgebungsvariable).

Das zweite Secret, service-account, stellt service-account.json, eine Datei mit einem Dienstkontotoken, in /var/secrets/google bereit.

So sehen die Secret-Objekte aus:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Der Name des ersten Kubernetes-Secrets wird in der Variablen secret_env definiert. Dieses Secret heißt airflow-secrets. Der Parameter deploy_type gibt an, dass er als Umgebungsvariable verfügbar gemacht werden muss. Der Name der Umgebungsvariable ist SQL_CONN, wie im Parameter deploy_target angegeben. Schließlich wird der Wert der Umgebungsvariablen SQL_CONN auf den Wert des Schlüssels sql_alchemy_conn festgelegt.

Der Name des zweiten Kubernetes-Secrets wird in der Variablen secret_volume definiert. Dieses Secret heißt service-account. Es wird als Volume bereitgestellt, wie im Parameter deploy_type angegeben. Der Pfad der bereitzustellenden Datei (deploy_target) lautet /var/secrets/google. Schließlich lautet der key des Secrets, das in deploy_target gespeichert ist, service-account.json.

Die Operatorkonfiguration sieht so aus:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes-ConfigMaps verwalten

gcloud

ConfigMap erstellen

Führen Sie den folgenden Befehl aus, um eine ConfigMap zu erstellen:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • CONFIG_MAP_FILE: Pfad zu einer lokalen YAML-Datei, die die Konfiguration der ConfigMap enthält.

Beispiel:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

ConfigMap aktualisieren

Führen Sie den folgenden Befehl aus, um eine ConfigMap zu aktualisieren. Der Name der ConfigMap wird aus der angegebenen YAML-Datei übernommen und der Inhalt der ConfigMap wird ersetzt.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • CONFIG_MAP_FILE: Pfad zu einer lokalen YAML-Datei, die die Konfiguration der ConfigMap enthält. Geben Sie den Namen der ConfigMap in dieser Datei im Feld metadata > name an.

ConfigMaps auflisten

Führen Sie den folgenden Befehl aus, um eine Liste der ConfigMaps und ihrer Felder für eine Umgebung abzurufen. Schlüsselwerte in der Ausgabe werden unverändert angezeigt.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.

Details der ConfigMap abrufen

Führen Sie den folgenden Befehl aus, um detaillierte Informationen zu einer ConfigMap zu erhalten. Schlüsselwerte in der Ausgabe werden unverändert angezeigt.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Ersetzen Sie Folgendes:

  • CONFIG_MAP_NAME: der Name der ConfigMap, wie er im Feld metadata > name in der YAML-Datei mit der Konfiguration der ConfigMap definiert wurde.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.

ConfigMap löschen

Führen Sie den folgenden Befehl aus, um eine ConfigMap zu löschen:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: der Name der ConfigMap, wie er im Feld metadata > name in der YAML-Datei mit der Konfiguration der ConfigMap definiert wurde.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.

API

ConfigMap erstellen

  1. Erstellen Sie eine API-Anfrage environments.userWorkloadsConfigMaps.create.

  2. In dieser Anfrage:

    1. Geben Sie im Anfragetext im Feld name den URI für die neue ConfigMap an.
    2. Geben Sie im Anfragetext im Feld data Schlüssel und Werte für die ConfigMap an.

Beispiel:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

ConfigMap aktualisieren

  1. Erstellen Sie eine API-Anfrage environments.userWorkloadsConfigMaps.update.

  2. In dieser Anfrage:

    1. Geben Sie im Anfragetext im Feld name den URI der ConfigMap an.
    2. Geben Sie im Anfragetext im Feld data Schlüssel und Werte für die ConfigMap an. Die Werte werden ersetzt.

Beispiel:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

ConfigMaps auflisten

Erstellen Sie eine environments.userWorkloadsConfigMaps.list-API-Anfrage. Schlüsselwerte in der Ausgabe werden unverändert angezeigt. Bei dieser Anfrage ist es möglich, die Paginierung zu verwenden. Weitere Informationen finden Sie in der Referenz zur Anfrage.

Beispiel:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Details der ConfigMap abrufen

Erstellen Sie eine environments.userWorkloadsConfigMaps.get-API-Anfrage. Schlüsselwerte in der Ausgabe werden unverändert angezeigt.

Beispiel:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

ConfigMap löschen

Erstellen Sie eine environments.userWorkloadsConfigMaps.delete-API-Anfrage.

Beispiel:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

Die Ressource google_composer_user_workloads_config_map definiert eine ConfigMap mit Schlüsseln und Werten, die im Block data definiert sind.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Der Name der Ressourcen der Umgebung, die die Definition der Umgebung in Terraform enthält. Der Name der tatsächlichen Umgebung wird ebenfalls in dieser Ressource angegeben.
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • CONFIG_MAP_NAME: der Name der ConfigMap.
  • KEY_NAME: ein oder mehrere Schlüssel für diese ConfigMap.
  • KEY_VALUE: Wert für den Schlüssel.

Beispiel:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

ConfigMaps in Ihren DAGs verwenden

In diesem Beispiel wird gezeigt, wie Sie ConfigMaps in Ihren DAGs verwenden.

Im folgenden Beispiel wird ein ConfigMap-Objekt im Parameter configmaps übergeben. Alle Schlüssel dieser ConfigMap sind als Umgebungsvariablen verfügbar:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Das folgende Beispiel zeigt, wie eine ConfigMap als Volume eingebunden wird:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Informationen zum CNCF Kubernetes-Anbieter

KubernetesPodOperator ist im apache-airflow-providers-cncf-kubernetes-Anbieter implementiert.

Ausführliche Versionshinweise zum CNCF Kubernetes-Anbieter finden Sie auf der CNCF Kubernetes Provider-Website.

Ressourcenanforderungen

Cloud Composer 3 unterstützt die folgenden Werte für Ressourcenanforderungen. Ein Beispiel für die Verwendung von Ressourcenanforderungen finden Sie unter Zusätzliche Konfiguration.

Ressource Minimum Maximum Schritt
CPU 0,25 32 Schrittwerte: 0,25, 0,5, 1, 2, 4, 6, 8, 10, …, 32. Angefragte Werte werden auf den nächstgelegenen unterstützten Schrittwert aufgerundet (z. B. 5 auf 6).
Arbeitsspeicher 2G (GB) 128 GB Schrittwerte: 2, 3, 4, 5, ..., 128. Angefragte Werte werden auf den nächstgelegenen unterstützten Schrittwert aufgerundet (z. B. 3, 5G auf 4G).
Speicher - 100 GB Beliebiger Wert. Wenn mehr als 100 GB angefordert werden, werden nur 100 GB bereitgestellt.

Weitere Informationen zu Ressourceneinheiten in Kubernetes finden Sie unter Ressourceneinheiten in Kubernetes.

Fehlerbehebung

In diesem Abschnitt finden Sie Tipps zur Behebung häufiger Probleme mit KubernetesPodOperator:

Logs ansehen

Bei der Fehlerbehebung können Sie die Logs in der folgenden Reihenfolge prüfen:

  1. Airflow-Tasklogs:

    1. Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.

      Zur Seite Umgebungen

    2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

    3. Rufen Sie den Tab DAGs auf.

    4. Klicken Sie auf den Namen des DAG und dann auf den DAG-Lauf, um die Details und Logs aufzurufen.

  2. Airflow-Planer-Logs:

    1. Rufen Sie die Seite Umgebungsdetails auf.

    2. Rufen Sie den Tab Logs auf.

    3. Airflow-Planer-Logs prüfen

  3. Logs für Nutzerarbeitslasten:

    1. Rufen Sie die Seite Umgebungsdetails auf.

    2. Rufen Sie den Tab Monitoring auf.

    3. Wählen Sie User Workloads (Nutzerarbeitslasten) aus.

    4. Sehen Sie sich die Liste der ausgeführten Arbeitslasten an. Sie können die Logs und Informationen zur Ressourcennutzung für jede Arbeitslast aufrufen.

Rückgabecodes ungleich null

Wenn Sie KubernetesPodOperator (und GKEStartPodOperator) verwenden, bestimmt der Rückgabecode des Einstiegspunkts des Containers, ob die Aufgabe als erfolgreich betrachtet wird oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.

Ein gängiges Muster besteht darin, ein Shell-Skript als Container-Einstiegspunkt auszuführen, um mehrere Vorgänge innerhalb des Containers zusammenzufassen.

Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.

Pod-Zeitüberschreitungen

Das Standardzeitlimit für KubernetesPodOperator beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen von KubernetesPodOperator den Parameter startup_timeout_seconds ändern.

Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Pod-Zeitüberschreitungen können auch auftreten, wenn das Cloud Composer-Dienstkonto nicht die erforderlichen IAM-Berechtigungen zum Ausführen der jeweiligen Aufgabe hat. Das können Sie prüfen, wenn Sie sich in den GKE-Dashboards die Fehler auf Pod-Ebene in den Logs für die jeweilige Arbeitslast ansehen oder Cloud Logging verwenden.

KubernetesPodOperator-Aufgaben schlagen fehl, wenn eine große Anzahl von Aufgaben ausgeführt wird

Wenn in Ihrer Umgebung eine große Anzahl von Aufgaben vom Typ „KubernetesPodOperator“ oder KubernetesExecutor gleichzeitig ausgeführt wird, akzeptiert Cloud Composer 3 keine neuen Aufgaben, bis einige der vorhandenen Aufgaben abgeschlossen sind.

Weitere Informationen zur Fehlerbehebung bei diesem Problem finden Sie unter Fehlerbehebung bei KubernetesExecutor-Aufgaben.

Nächste Schritte