KubernetesPodOperator verwenden.

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird beschrieben, wie Sie mit KubernetesPodOperator Kubernetes-Pods von Cloud Composer in den Google Kubernetes Engine-Cluster Ihrer Cloud Composer-Umgebung bereitstellen.

KubernetesPodOperator startet Kubernetes-Pods im Cluster Ihrer Umgebung. Im Vergleich dazu Mit Google Kubernetes Engine-Operatoren werden Kubernetes-Pods in einer bestimmten Cluster, der ein separater Cluster sein kann, der nicht mit Ihrem zu verbessern. Sie können Cluster auch mit Google Kubernetes Engine-Operatoren erstellen und löschen.

KubernetesPodOperator ist eine gute Option, wenn Sie Folgendes benötigen:

  • Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
  • Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.

Hinweise

In der folgenden Liste finden Sie die Unterschiede zwischen KubernetesPodOperator in Cloud Composer 3 und Cloud Composer 2. Prüfen Sie, ob Ihre DAGs kompatibel sind:

  • Es ist nicht möglich, benutzerdefinierte Namespaces in Cloud Composer 3 zu erstellen. Pods werden immer im Namespace composer-user-workloads ausgeführt, auch wenn ein anderer Namespace angegeben ist. Pods in diesem Namespace haben Zugriff auf Ihre Projektressourcen und VPC-Netzwerk (falls aktiviert) ohne zusätzliche Konfiguration.

  • Kubernetes-Secrets und -ConfigMaps können nicht mit der Kubernetes API erstellt werden. Stattdessen bietet Cloud Composer Google Cloud CLI-Befehle, Terraform-Ressourcen und die Cloud Composer API zum Verwalten von Kubernetes-Secrets und ConfigMaps. Weitere Informationen finden Sie unter Kubernetes-Secrets und -ConfigMaps verwenden

  • Wie in Cloud Composer 2 ist auch die Pod-Affinitätskonfiguration nicht verfügbar. Wenn Sie die Pod-Affinität verwenden möchten, starten Sie Pods stattdessen mit den GKE-Operatoren in einem anderen Cluster.

KubernetesPodOperator in Cloud Composer 3

In diesem Abschnitt wird beschrieben, wie KubernetesPodOperator in Cloud Composer 3 funktioniert.

Ressourcennutzung

In Cloud Composer 3 wird der Cluster Ihrer Umgebung automatisch skaliert. Zusätzliche Arbeitslasten, die Sie mit KubernetesPodOperator ausführen, werden unabhängig von Ihrer Umgebung skaliert. Ihr ist nicht von der erhöhten Ressourcennachfrage betroffen, der Cluster Ihrer Umgebung je nach Ressource hoch- und herunterskaliert, Nachfrage.

Die Preise für die zusätzlichen Arbeitslasten, die Sie im Cluster Ihrer Umgebung ausführen folgt dem Preismodell für Cloud Composer 3 und verwendet Cloud Composer 3-SKUs.

Cloud Composer 3 verwendet Autopilot-Cluster, die das Konzept von Compute-Klassen einführen:

  • Cloud Composer unterstützt nur die Compute-Klasse general-purpose.

  • Wenn keine Klasse ausgewählt ist, wird standardmäßig die Klasse general-purpose verwendet, wenn Sie Pods mit KubernetesPodOperator erstellen.

  • Jede Klasse ist mit bestimmten Eigenschaften und Ressourcenlimits verknüpft. Weitere Informationen finden Sie in der Autopilot-Dokumentation. Pods, die in der general-purpose-Klasse ausgeführt werden, können beispielsweise bis zu 110 GiB Arbeitsspeicher verbrauchen.

Zugriff auf die Ressourcen des Projekts

In Cloud Composer 3 befindet sich der Cluster Ihrer Umgebung Im Mandantenprojekt werden Pods im Mandantenprojekt der Umgebung ausgeführt. in einem isolierten Namespace.

In Cloud Composer 3 werden Pods immer im composer-user-workloads ausgeführt -Namespace, auch wenn ein anderer Namespace angegeben ist. Pods in diesem Namespace können ohne zusätzliche Konfiguration auf Google Cloud-Ressourcen in Ihrem Projekt und auf Ihr VPC-Netzwerk (falls aktiviert) zugreifen. Der Zugriff auf diese Ressourcen erfolgt über das Dienstkonto Ihrer Umgebung. Es ist nicht möglich, ein anderes Dienstkonto anzugeben.

Minimalkonfiguration

Zum Erstellen eines KubernetesPodOperator müssen nur die name, image und task_id-Parameter sind erforderlich. Das /home/airflow/composer_kube_config enthält Anmeldedaten zur Authentifizierung bei GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Zusätzliche Konfiguration

In diesem Beispiel sind zusätzliche Parameter zu sehen, die Sie im KubernetesPodOperator konfigurieren können.

Weitere Informationen zu Parametern finden Sie in der Airflow-Referenz für KubernetesPodOperator. Informationen zur Verwendung von Kubernetes Secrets und ConfigMaps finden Sie unter Kubernetes Secrets und ConfigMaps verwenden. Für Informationen zur Verwendung von Jinja-Vorlagen mit KubernetesPodOperator finden Sie unter Jinja-Vorlagen verwenden

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Jinja-Vorlagen verwenden

Airflow unterstützt Jinja-Vorlagen in DAGs.

Sie müssen die erforderlichen Airflow-Parameter (task_id, name und image) durch den Operator. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds, arguments, env_vars und config_file.

Der Parameter env_vars wird im Beispiel über eine Airflow-Variable namens my_value festgelegt. Der Beispiel-DAG bezieht seinen Wert aus der Vorlagenvariablen vars in Airflow. Airflow bietet mehr Variablen, die Zugriff auf verschiedene Arten von Informationen ermöglichen. So können Sie beispielsweise mit der Vorlagenvariablen conf auf Werte der Airflow-Konfigurationsoptionen zugreifen. Weitere Informationen und eine Liste der in Airflow verfügbaren Variablen finden Sie in der Airflow-Dokumentation unter Referenz zu Vorlagen.

Ohne den DAG zu ändern oder die Variable env_vars zu erstellen, Die Aufgabe ex-kube-templates in diesem Beispiel schlägt fehl, weil die Variable existieren. Erstellen Sie diese Variable in der Airflow-Benutzeroberfläche oder mit der Google Cloud CLI:

Airflow-UI

  1. Rufen Sie die Airflow-UI auf.

  2. Klicken Sie in der Symbolleiste auf Verwaltung > Variablen.

  3. Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.

  4. Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:

    • Key: my_value
    • Val: example_value
  5. Klicken Sie auf Speichern.

gcloud

Geben Sie den folgenden Befehl ein:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Ersetzen Sie:

  • ENVIRONMENT durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.

Das folgende Beispiel zeigt, wie Jinja-Vorlagen mit KubernetesPodOperator:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes-Secrets und -ConfigMaps verwenden

Ein Kubernetes-Secret ist ein Objekt, das sensible Daten enthält. Eine Kubernetes-ConfigMap ist ein Objekt, das nicht vertrauliche Daten in Schlüssel/Wert-Paaren enthält.

In Cloud Composer 3 können Sie Secrets und ConfigMaps mit der Google Cloud CLI, der API oder Terraform erstellen und dann über den KubernetesPodOperator darauf zugreifen:

  • Mit der Google Cloud CLI und API stellen Sie eine YAML-Konfigurationsdatei bereit.
  • Mit Terraform definieren Sie Secrets und ConfigMaps als separate Ressourcen in Terraform-Konfigurationsdateien

YAML-Konfigurationsdateien

Wenn Sie ein Kubernetes-Secret oder eine ConfigMap mithilfe der Google Cloud CLI und API eine Datei im YAML-Format bereitstellen. Diese Datei muss demselben Format entsprechen wie bei Kubernetes-Secrets und ConfigMaps. Kubernetes-Dokumentation bietet viele Codebeispiele für ConfigMaps und Secrets. Für den Einstieg können Sie sieh dir die Anmeldedaten mithilfe von Secrets sicher verteilen und ConfigMaps.

Wie bei Kubernetes-Secrets sollten Sie die Base64-Darstellung verwenden, wenn Sie Werte in Secrets definieren.

Sie können einen Wert mit dem folgenden Befehl codieren. Dies ist eine von vielen Möglichkeiten, einen base64-codierten Wert zu erhalten:

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Ausgabe:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Die folgenden beiden YAML-Dateibeispiele werden später in diesem Leitfaden in Beispielen verwendet. Beispiel für eine YAML-Konfigurationsdatei für ein Kubernetes-Secret:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Ein weiteres Beispiel, das zeigt, wie Dateien eingefügt werden. Wie im vorherigen Beispiel: Codieren Sie zuerst den Inhalt einer Datei (cat ./key.json | base64) und geben Sie diesen Wert dann in der YAML-Datei an:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Beispiel für eine YAML-Konfigurationsdatei für eine ConfigMap. Sie brauchen die Base64- Darstellung in ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes-Secrets verwalten

gcloud

Secret erstellen

Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret zu erstellen:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • SECRET_FILE: Pfad zu einer lokalen YAML-Datei, die den des Secrets enthält Konfiguration.

Beispiel:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Secret aktualisieren

Führen Sie den folgenden Befehl aus, um ein Kubernetes-Secret zu aktualisieren. Der Name des Secrets wird aus der angegebenen YAML-Datei übernommen und der Inhalt des Secrets wird ersetzt.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.
  • SECRET_FILE: Pfad zu einer lokalen YAML-Datei, die den des Secrets enthält Konfiguration. Geben Sie den Namen des Geheimnisses in diesem Feld an: metadata > name.

Secrets auflisten

Führen Sie den folgenden Befehl aus, um eine Liste der Secrets und ihrer Felder für eine Umgebung abzurufen. Schlüsselwerte in der Ausgabe werden durch Sternchen ersetzt.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.

Secret-Details abrufen

Führen Sie den folgenden Befehl aus, um ausführliche Informationen zu einem Secret zu erhalten. Schlüsselwerte in der Ausgabe werden durch Sternchen ersetzt.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Ersetzen Sie Folgendes:

  • SECRET_NAME: der Name des Secrets, wie er im Feld metadata> name in der YAML-Datei mit der Konfiguration des Secrets definiert wurde.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.

Secret löschen

Führen Sie den folgenden Befehl aus, um ein Secret zu löschen:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: der Name des Secrets gemäß der Definition in metadata > name in der YAML-Datei mit dem Secret-Wert Konfiguration.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.

API

Secret erstellen

  1. Erstellen: environments.userWorkloadsSecrets.create-API

  2. In dieser Anfrage:

    1. Geben Sie im Anfragetext im Feld name den URI für den neues Secret erstellen.
    2. Geben Sie im Anfragetext im Feld data Schlüssel und base64-codierte Werte für das Secret an.

Beispiel:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Secret aktualisieren

  1. Erstellen Sie eine environments.userWorkloadsSecrets.update-API-Anfrage.

  2. In dieser Anfrage:

    1. Geben Sie im Anfragetext im Feld name den URI der Secret.
    2. Geben Sie im Anfragetext im Feld data Schlüssel und base64-codierte Werte für das Secret. Die Werte werden ersetzt.

Beispiel:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

Secrets auflisten

Erstellen Sie eine environments.userWorkloadsSecrets.list-API-Anfrage. Schlüssel/Wert-Paare in der Ausgabe werden durch Sternchen ersetzt. Es ist Paginierung kann bei dieser Anfrage verwendet werden. Weitere Informationen finden Sie in der Anfragereferenz zu erhalten Sie weitere Informationen.

Beispiel:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Secret-Details abrufen

Erstellen Sie eine environments.userWorkloadsSecrets.get-API-Anfrage. Schlüsselwerte in der Ausgabe werden durch Sternchen ersetzt.

Beispiel:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Secret löschen

Erstellen: environments.userWorkloadsSecrets.delete-API

Beispiel:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

Die google_composer_user_workloads_secret Ressource ein Kubernetes-Secret mit Schlüsseln und Werten definiert, Block data.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: der Name der Umgebungsressource, die enthält die Definition der Umgebung in Terraform. Die tatsächlichen ist auch der Name der Umgebung in dieser Ressource angegeben.
  • LOCATION: die Region, in der sich die Umgebung befindet.
  • SECRET_NAME: der Name des Secrets.
  • KEY_NAME: mindestens ein Schlüssel für dieses Secret.
  • KEY_VALUE: Base64-codierter Wert für den Schlüssel. Sie können den Wert mit der Funktion base64encode codieren (siehe Beispiel).

Die folgenden beiden Beispiele für Kubernetes-Secrets werden später in diesem Leitfaden in Beispielen verwendet.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Ein weiteres Beispiel, das zeigt, wie Dateien eingebunden werden können. Mit der Funktion file können Sie den Inhalt der Datei als String lesen und dann mit base64 codieren:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Kubernetes-Secrets in DAGs verwenden

In diesem Beispiel werden zwei Möglichkeiten zur Verwendung von Kubernetes Secrets gezeigt: als Umgebung und als vom Pod bereitgestelltes Volume.

Das erste Secret, airflow-secrets, ist auf eine Kubernetes-Umgebungsvariable namens SQL_CONN festgelegt (nicht auf eine Airflow- oder Cloud Composer-Umgebungsvariable).

Das zweite Secret, service-account, stellt service-account.json, eine Datei, bereit durch ein Dienstkonto-Token auf /var/secrets/google.

Die Secret-Objekte sehen so aus:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Der Name des ersten Kubernetes-Secrets wird in der Variablen secret_env definiert. Dieses Secret heißt airflow-secrets. Der Parameter deploy_type gibt an, dass sie als Umgebungsvariable bereitgestellt werden muss. Der Wert der Umgebungsvariablen Der Name lautet SQL_CONN, wie im Parameter deploy_target angegeben. Schließlich wird der Wert der Umgebungsvariablen SQL_CONN auf den Wert des Schlüssels sql_alchemy_conn festgelegt.

Der Name des zweiten Kubernetes-Secrets wird in der Variablen secret_volume definiert. Dieses Secret heißt service-account. Sie wird als Lautstärke, wie im Parameter deploy_type angegeben. Der Pfad der bereitzustellenden Datei (deploy_target) lautet /var/secrets/google. Schließlich ist der key der Das im deploy_target gespeicherte Secret ist service-account.json.

Die Operatorkonfiguration sieht so aus:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Kubernetes-ConfigMaps verwalten

gcloud

ConfigMap erstellen

Führen Sie den folgenden Befehl aus, um eine ConfigMap zu erstellen:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.
  • CONFIG_MAP_FILE: Pfad zu einer lokalen YAML-Datei, die die Konfiguration des ConfigMaps enthält.

Beispiel:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

ConfigMap aktualisieren

Führen Sie den folgenden Befehl aus, um eine ConfigMap zu aktualisieren. Der Name der ConfigMap wird aus der angegebenen YAML-Datei übernommen und der Inhalt der ConfigMap wird ersetzt.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.
  • CONFIG_MAP_FILE: Pfad zur lokalen YAML-Datei, die die ConfigMap-Datei enthält Konfiguration. Geben Sie den Namen der ConfigMap in der metadata > Feld name in dieser Datei.

ConfigMaps auflisten

Führen Sie den Befehl folgenden Befehl. Schlüssel/Wert-Paare werden in der Ausgabe unverändert angezeigt.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Ersetzen Sie Folgendes:

  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.

Details von ConfigMap abrufen

Führen Sie den folgenden Befehl aus, um ausführliche Informationen zu einer ConfigMap zu erhalten. Schlüsselwerte in der Ausgabe werden unverändert angezeigt.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Ersetzen Sie Folgendes:

  • CONFIG_MAP_NAME: der Name der ConfigMap, wie er im Feld metadata > name in der YAML-Datei mit der ConfigMap-Konfiguration definiert wurde.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: die Region, in der sich die Umgebung befindet.

ConfigMap löschen

Führen Sie den folgenden Befehl aus, um eine ConfigMap zu löschen:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: der Name der ConfigMap, wie er in den metadata > name in der YAML-Datei mit dem ConfigMap-Konfiguration.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • LOCATION: Region, in der sich die Umgebung befindet.

API

ConfigMap erstellen

  1. Erstellen: environments.userWorkloadsConfigMaps.create API-Anfrage.

  2. In dieser Anfrage:

    1. Geben Sie im Anfragetext im Feld name den URI für den neue ConfigMap.
    2. Geben Sie im Anfragetext im Feld data Schlüssel und Werte für die ConfigMap an.

Beispiel:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

ConfigMap aktualisieren

  1. Erstellen: environments.userWorkloadsConfigMaps.update API-Anfrage.

  2. In dieser Anfrage:

    1. Geben Sie im Anfragetext im Feld name den URI der ConfigMap an.
    2. Geben Sie im Anfragetext im Feld data Schlüssel und Werte für die ConfigMap an. Die Werte werden ersetzt.

Beispiel:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

ConfigMaps auflisten

Erstellen: environments.userWorkloadsConfigMaps.list-API Schlüsselwerte in der Ausgabe werden unverändert angezeigt. Bei dieser Anfrage kann die Paginierung verwendet werden. Weitere Informationen finden Sie in der Referenz der Anfrage.

Beispiel:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Details zur ConfigMap abrufen

Erstellen: environments.userWorkloadsConfigMaps.get-API Schlüssel/Wert-Paare werden in der Ausgabe unverändert angezeigt.

Beispiel:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

ConfigMap löschen

Erstellen Sie eine environments.userWorkloadsConfigMaps.delete-API-Anfrage.

Beispiel:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

Die google_composer_user_workloads_config_map Ressource definiert eine ConfigMap mit Schlüsseln und Werten im Block data.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: Der Name der Ressourcen der Umgebung, die die Definition der Umgebung in Terraform enthält. Die tatsächlichen ist auch der Name der Umgebung in dieser Ressource angegeben.
  • LOCATION: Region, in der sich die Umgebung befindet.
  • CONFIG_MAP_NAME: der Name der ConfigMap.
  • KEY_NAME: ein oder mehrere Schlüssel für diese ConfigMap.
  • KEY_VALUE: Wert für den Schlüssel.

Beispiel:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

ConfigMaps in Ihren DAGs verwenden

Dieses Beispiel zeigt, wie Sie ConfigMaps in Ihren DAGs verwenden.

Im folgenden Beispiel wird ein ConfigMap über den Parameter configmaps übergeben. Alle Schlüssel dieser ConfigMap sind als Umgebungsvariablen verfügbar:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Das folgende Beispiel zeigt, wie Sie eine ConfigMap als Volume bereitstellen:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Informationen zum CNCF-Kubernetes-Anbieter

KubernetesPodOperator ist im apache-airflow-providers-cncf-kubernetes-Anbieter implementiert.

Detaillierte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie unter Website des CNCF-Kubernetes-Anbieters

Fehlerbehebung

Dieser Abschnitt enthält Tipps zur Fehlerbehebung bei gängigen KubernetesPodOperator Probleme:

Logs ansehen

Prüfen Sie bei der Fehlerbehebung die Protokolle in der folgenden Reihenfolge:

  1. Airflow-Aufgabenlogs:

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

      Zur Seite Umgebungen

    2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

    3. Rufen Sie den Tab DAGs auf.

    4. Klicken Sie auf den Namen des DAG und dann auf die Ausführung des DAG, um die Details aufzurufen. und Logs.

  2. Airflow-Planer-Logs:

    1. Rufen Sie die Seite Umgebungsdetails auf.

    2. Rufen Sie den Tab Protokolle auf.

    3. Prüfen Sie die Logs des Airflow-Planers.

  3. Logs für Nutzerarbeitslasten:

    1. Rufen Sie die Seite Umgebungsdetails auf.

    2. Rufen Sie den Tab Monitoring auf.

    3. Wählen Sie Nutzerarbeitslasten aus.

    4. Prüfen Sie die Liste der ausgeführten Arbeitslasten. Sie können sich die Protokolle und Informationen zur Ressourcennutzung für jede Arbeitslast ansehen.

Rückgabecodes ungleich null

Wenn Sie den KubernetesPodOperator (und GKEStartPodOperator) verwenden, gibt den Einstiegspunkt des Containers an, ob die Aufgabe ob sie erfolgreich sind oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.

Ein gängiges Muster besteht darin, ein Shell-Script als Container-Einstiegspunkt auszuführen, um mehrere Vorgänge innerhalb des Containers zusammenzufassen.

Wenn Sie ein solches Skript schreiben, empfehlen wir Ihnen, das set -e-Befehl am Anfang des Skripts, sodass fehlgeschlagene Befehle im Script das Skript beenden und den Fehler an die Airflow-Taskinstanz weitergeben.

Pod-Zeitüberschreitungen

Das Standardzeitlimit für KubernetesPodOperator beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, indem Sie den Parameter startup_timeout_seconds ändern, wenn erstellen Sie den KubernetesPodOperator.

Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Pod-Zeitüberschreitungen können auch auftreten, wenn Cloud Composer-Dienstkonto nicht über die erforderlichen IAM-Berechtigungen verfügt, um die Aufgabe am Hand. Überprüfen Sie dies, indem Sie sich Fehler auf Pod-Ebene mithilfe der Methode GKE-Dashboards zum Ansehen der Logs für Ihre oder Cloud Logging verwenden.

Nächste Schritte