Utilizzo di KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina descrive come utilizzare KubernetesPodOperator per eseguire il deployment di pod Kubernetes da Cloud Composer nel cluster Google Kubernetes Engine che fa parte del tuo ambiente Cloud Composer.

KubernetesPodOperator avvia pod Kubernetes nel cluster del tuo ambiente. Al contrario, gli operatori Google Kubernetes Engine eseguono i pod Kubernetes in un cluster specificato, che può essere un cluster separato non correlato al tuo ambiente. Puoi anche creare ed eliminare cluster utilizzando gli operatori Google Kubernetes Engine.

KubernetesPodOperator è una buona opzione se hai bisogno di:

  • Dipendenze Python personalizzate non disponibili tramite il repository PyPI pubblico.
  • Dipendenze binarie non disponibili nell'immagine worker di Cloud Composer stock.

Prima di iniziare

Controlla il seguente elenco di differenze tra KubernetesPodOperator in Cloud Composer 3 e Cloud Composer 2 e assicurati che i tuoi DAG siano compatibili:

  • Non è possibile creare spazi dei nomi personalizzati in Cloud Composer 3. I pod vengono sempre eseguiti nello spazio dei nomi composer-user-workloads, anche se viene specificato uno spazio dei nomi diverso. I pod in questo spazio dei nomi hanno accesso alle risorse e alla rete VPC del tuo progetto (se abilitata) senza configurazione aggiuntiva.

  • Non è possibile eseguire più container sidecar aggiuntivi in Cloud Composer 3. Puoi eseguire un singolo container sidecar se è denominato airflow-xcom-sidecar.

  • Non è possibile creare Kubernetes Secrets e ConfigMaps utilizzando l'API Kubernetes. Cloud Composer fornisce invece comandi Google Cloud CLI, risorse Terraform e l'API Cloud Composer per gestire i secret Kubernetes e ConfigMaps. Per maggiori informazioni, consulta Utilizzare i secret e le ConfigMap di Kubernetes.

  • Non è possibile eseguire il deployment di carichi di lavoro personalizzati in Cloud Composer 3. Solo i volumi Secret e ConfigMap di Kubernetes possono essere modificati, ma tutte le altre modifiche alla configurazione non sono possibili.

  • I requisiti di risorse (CPU, memoria e spazio di archiviazione) devono essere specificati utilizzando i valori supportati.

  • Come in Cloud Composer 2, la configurazione dell'affinità dei pod non è disponibile. Se vuoi utilizzare l'affinità dei pod, utilizza gli operatori GKE per avviare i pod in un cluster diverso.

Informazioni su KubernetesPodOperator in Cloud Composer 3

Questa sezione descrive il funzionamento di KubernetesPodOperator in Cloud Composer 3.

Utilizzo delle risorse

In Cloud Composer 3, il cluster del tuo ambiente viene scalato automaticamente. I carichi di lavoro aggiuntivi che esegui utilizzando KubernetesPodOperator vengono scalati indipendentemente dal tuo ambiente. Il tuo ambiente non è interessato dall'aumento della domanda di risorse, ma il cluster del tuo ambiente viene scalato in base alla domanda di risorse.

I prezzi dei carichi di lavoro aggiuntivi che esegui nel cluster del tuo ambiente seguono il modello di prezzo di Cloud Composer 3 e utilizzano gli SKU di Cloud Composer 3.

Cloud Composer 3 utilizza cluster Autopilot che introducono il concetto di classi di computing:

  • Cloud Composer supporta solo la classe di calcolo general-purpose.

  • Per impostazione predefinita, se non viene selezionata alcuna classe, viene presunta la classe general-purpose quando crei pod utilizzando KubernetesPodOperator.

  • Ogni classe è associata a proprietà e limiti delle risorse specifici. Puoi saperne di più nella documentazione di Autopilot. Ad esempio, i pod eseguiti all'interno della classe general-purpose possono utilizzare fino a 110 GiB di memoria.

Accesso alle risorse del progetto

In Cloud Composer 3, il cluster dell'ambiente si trova nel progetto tenant e non è possibile configurarlo. I pod vengono eseguiti nel cluster dell'ambiente, in uno spazio dei nomi isolato.

In Cloud Composer 3, i pod vengono sempre eseguiti nello spazio dei nomi composer-user-workloads, anche se ne viene specificato uno diverso. I pod in questo spazio dei nomi possono accedere Google Cloud alle risorse nel tuo progetto e alla rete VPC (se è abilitata) senza configurazione aggiuntiva. Per accedere a queste risorse viene utilizzato il service account del tuo ambiente. Non è possibile specificare un account di servizio diverso.

Configurazione minima

Per creare un KubernetesPodOperator, sono necessari solo i parametri name, image da utilizzare e task_id del pod. /home/airflow/composer_kube_config contiene le credenziali per l'autenticazione a GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configurazione aggiuntiva

Questo esempio mostra parametri aggiuntivi che puoi configurare in KubernetesPodOperator.

Per saperne di più, consulta le seguenti risorse:

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Utilizzare i modelli Jinja

Airflow supporta i modelli Jinja nei DAG.

Devi dichiarare i parametri Airflow obbligatori (task_id, name e image) con l'operatore. Come mostrato nell'esempio seguente, puoi creare un modello per tutti gli altri parametri con Jinja, inclusi cmds, arguments, env_vars e config_file.

Il parametro env_vars nell'esempio è impostato da una variabile Airflow denominata my_value. Il DAG di esempio ottiene il suo valore dalla variabile modello vars in Airflow. Airflow ha più variabili che forniscono l'accesso a diversi tipi di informazioni. Ad esempio, puoi utilizzare la variabile modello conf per accedere ai valori delle opzioni di configurazione di Airflow. Per ulteriori informazioni e l'elenco delle variabili disponibili in Airflow, consulta la sezione Riferimento ai modelli nella documentazione di Airflow.

Senza modificare il DAG o creare la variabile env_vars, l'attività ex-kube-templates nell'esempio non va a buon fine perché la variabile non esiste. Crea questa variabile nell'interfaccia utente di Airflow o con Google Cloud CLI:

UI di Airflow

  1. Vai all'UI di Airflow.

  2. Nella barra degli strumenti, seleziona Amministrazione > Variabili.

  3. Nella pagina List Variable (Variabile elenco), fai clic su Add a new record (Aggiungi un nuovo record).

  4. Nella pagina Aggiungi variabile, inserisci le seguenti informazioni:

    • Chiave:my_value
    • Val: example_value
  5. Fai clic su Salva.

gcloud

Inserisci questo comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Sostituisci:

  • ENVIRONMENT con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.

Il seguente esempio mostra come utilizzare i modelli Jinja con KubernetesPodOperator:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Utilizzare i secret e le ConfigMap di Kubernetes

Un secret di Kubernetes è un oggetto che contiene dati sensibili. Un ConfigMap Kubernetes è un oggetto che contiene dati non riservati in coppie chiave-valore.

In Cloud Composer 3, puoi creare Secrets e ConfigMaps utilizzando Google Cloud CLI, l'API o Terraform, quindi accedervi da KubernetesPodOperator:

  • Con Google Cloud CLI e API, fornisci un file di configurazione YAML.
  • Con Terraform, definisci i secret e le configMap come risorse separate nei file di configurazione Terraform.

Informazioni sui file di configurazione YAML

Quando crei un secret Kubernetes o un oggetto ConfigMap utilizzando Google Cloud CLI e l'API, fornisci un file in formato YAML. Questo file deve seguire lo stesso formato utilizzato da secret e ConfigMap di Kubernetes. La documentazione di Kubernetes fornisce molti esempi di codice di ConfigMap e Secret. Per iniziare, puoi consultare la pagina Distribuire le credenziali in modo sicuro utilizzando i secret e ConfigMaps.

Come nei secret di Kubernetes, utilizza la rappresentazione Base64 quando definisci i valori nei secret.

Per codificare un valore, puoi utilizzare il seguente comando (questo è uno dei tanti modi per ottenere un valore con codifica Base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Output:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

I due esempi di file YAML riportati di seguito vengono utilizzati negli esempi più avanti in questa guida. File di configurazione YAML di esempio per un secret Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Un altro esempio che mostra come includere i file. Come nell'esempio precedente, codifica prima i contenuti di un file (cat ./key.json | base64), poi fornisci questo valore nel file YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Un file di configurazione YAML di esempio per un oggetto ConfigMap. Non è necessario utilizzare la rappresentazione base64 in ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Gestisci i secret Kubernetes

gcloud

Creare un secret

Per creare un secret di Kubernetes, esegui questo comando:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • SECRET_FILE: il percorso di un file YAML locale che contiene la configurazione del secret.

Esempio:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Aggiornare un secret

Per aggiornare un secret di Kubernetes, esegui il comando seguente. Il nome del secret verrà recuperato dal file YAML specificato e i contenuti del secret verranno sostituiti.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • SECRET_FILE: il percorso di un file YAML locale che contiene la configurazione del secret. Specifica il nome del secret nel campo metadata > name di questo file.

Elenca secret

Per ottenere un elenco dei secret e dei relativi campi per un ambiente, esegui il comando seguente. I valori chiave nell'output verranno sostituiti con asterischi.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

Visualizzare i dettagli del segreto

Per ottenere informazioni dettagliate su un secret, esegui questo comando. I valori della chiave nell'output verranno sostituiti con asterischi.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Sostituisci quanto segue:

  • SECRET_NAME: il nome del secret, come definito nel campo metadata > name del file YAML con la configurazione del secret.
  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

Eliminare un secret

Per eliminare un secret, esegui questo comando:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: il nome del secret, come definito nel campo metadata > name del file YAML con la configurazione del secret.
  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

API

Creare un secret

  1. Crea una richiesta API environments.userWorkloadsSecrets.create.

  2. In questa richiesta:

    1. Nel corpo della richiesta, nel campo name, specifica l'URI del nuovo secret.
    2. Nel corpo della richiesta, nel campo data, specifica le chiavi e i valori con codifica Base64 per il secret.

Esempio:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Aggiornare un secret

  1. Crea una richiesta API environments.userWorkloadsSecrets.update.

  2. In questa richiesta:

    1. Nel corpo della richiesta, nel campo name, specifica l'URI del secret.
    2. Nel corpo della richiesta, nel campo data, specifica le chiavi e i valori con codifica Base64 per il secret. I valori verranno sostituiti.

Esempio:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

Elenca secret

Crea una richiesta API environments.userWorkloadsSecrets.list. I valori delle chiavi nell'output verranno sostituiti con asterischi. È possibile utilizzare la paginazione con questa richiesta. Per maggiori dettagli, consulta il riferimento della richiesta.

Esempio:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Visualizzare i dettagli del segreto

Crea una richiesta API environments.userWorkloadsSecrets.get. I valori delle chiavi nell'output verranno sostituiti con asterischi.

Esempio:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Eliminare un secret

Crea una richiesta API environments.userWorkloadsSecrets.delete.

Esempio:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

La risorsa google_composer_user_workloads_secret definisce un secret Kubernetes, con chiavi e valori definiti nel blocco data.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: il nome della risorsa dell'ambiente, che contiene la definizione dell'ambiente in Terraform. In questa risorsa è specificato anche il nome dell'ambiente effettivo.
  • LOCATION: la regione in cui si trova l'ambiente.
  • SECRET_NAME: il nome del secret.
  • KEY_NAME: una o più chiavi per questo secret.
  • KEY_VALUE: valore codificato in Base64 per la chiave. Puoi utilizzare la funzione base64encode per codificare il valore (vedi l'esempio).

I due esempi seguenti di secret di Kubernetes vengono utilizzati negli esempi più avanti in questa guida.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Un altro esempio che mostra come includere i file. Puoi utilizzare la funzione file per leggere i contenuti del file come stringa e poi codificarli in Base64:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Utilizzare i secret Kubernetes nei DAG

Questo esempio mostra due modi di utilizzare i secret di Kubernetes: come variabile di ambiente e come volume montato dal pod.

Il primo secret, airflow-secrets, è impostato su una variabile di ambiente Kubernetes denominata SQL_CONN (anziché su una variabile di ambiente Airflow o Cloud Composer).

Il secondo secret, service-account, monta service-account.json, un file con un token del account di servizio, su /var/secrets/google.

Ecco l'aspetto degli oggetti segreti:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Il nome del primo secret di Kubernetes è definito nella variabile secret_env. Questo secret si chiama airflow-secrets. Il parametro deploy_type specifica che deve essere esposto come variabile di ambiente. Il nome della variabile di ambiente è SQL_CONN, come specificato nel parametro deploy_target. Infine, il valore della variabile di ambiente SQL_CONN viene impostato sul valore della chiave sql_alchemy_conn.

Il nome del secondo secret di Kubernetes è definito nella variabile secret_volume. Questo secret si chiama service-account. Viene esposto come volume, come specificato nel parametro deploy_type. Il percorso del file da montare, deploy_target, è /var/secrets/google. Infine, il key del secret archiviato in deploy_target è service-account.json.

Ecco come si presenta la configurazione dell'operatore:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Gestisci ConfigMap di Kubernetes

gcloud

Crea un ConfigMap

Per creare un oggetto ConfigMap, esegui questo comando:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • CONFIG_MAP_FILE: il percorso di un file YAML locale che contiene la configurazione di ConfigMap.

Esempio:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

Aggiorna un ConfigMap

Per aggiornare un oggetto ConfigMap, esegui il comando seguente. Il nome di ConfigMaps verrà estratto dal file YAML specificato e i contenuti di ConfigMaps verranno sostituiti.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • CONFIG_MAP_FILE: il percorso di un file YAML locale che contiene la configurazione di ConfigMap. Specifica il nome di ConfigMap nel campo metadata > name di questo file.

Elenca ConfigMap

Per ottenere un elenco di ConfigMap e dei relativi campi per un ambiente, esegui il comando seguente. I valori delle chiavi nell'output verranno visualizzati così come sono.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

Visualizzare i dettagli di ConfigMap

Per ottenere informazioni dettagliate su un oggetto ConfigMap, esegui il seguente comando. I valori delle chiavi nell'output verranno visualizzati così come sono.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Sostituisci quanto segue:

  • CONFIG_MAP_NAME: il nome di ConfigMap, come definito nel campo metadata > name del file YAML con la configurazione di ConfigMap.
  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

Eliminare un ConfigMap

Per eliminare un oggetto ConfigMap, esegui questo comando:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: il nome di ConfigMap, come definito nel campo metadata > name del file YAML con la configurazione di ConfigMap.
  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

API

Crea un ConfigMap

  1. Crea una richiesta API environments.userWorkloadsConfigMaps.create.

  2. In questa richiesta:

    1. Nel corpo della richiesta, nel campo name, specifica l'URI del nuovo ConfigMap.
    2. Nel corpo della richiesta, nel campo data, specifica le chiavi e i valori per ConfigMap.

Esempio:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

Aggiorna un ConfigMap

  1. Crea una richiesta API environments.userWorkloadsConfigMaps.update.

  2. In questa richiesta:

    1. Nel corpo della richiesta, nel campo name, specifica l'URI di ConfigMap.
    2. Nel corpo della richiesta, nel campo data, specifica le chiavi e i valori per ConfigMap. I valori verranno sostituiti.

Esempio:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

Elenca ConfigMap

Crea una richiesta API environments.userWorkloadsConfigMaps.list. I valori delle chiavi nell'output verranno visualizzati così come sono. È possibile utilizzare la paginazione con questa richiesta. Per ulteriori dettagli, consulta il riferimento della richiesta.

Esempio:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Visualizzare i dettagli di ConfigMap

Crea una richiesta API environments.userWorkloadsConfigMaps.get. I valori delle chiavi nell'output verranno visualizzati così come sono.

Esempio:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Eliminare un ConfigMap

Crea una richiesta API environments.userWorkloadsConfigMaps.delete.

Esempio:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

La risorsa google_composer_user_workloads_config_map definisce un ConfigMap, con chiavi e valori definiti nel blocco data.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: il nome della risorsa dell'ambiente, che contiene la definizione dell'ambiente in Terraform. In questa risorsa è specificato anche il nome dell'ambiente effettivo.
  • LOCATION: la regione in cui si trova l'ambiente.
  • CONFIG_MAP_NAME: il nome di ConfigMap.
  • KEY_NAME: una o più chiavi per questo ConfigMap.
  • KEY_VALUE: Valore della chiave.

Esempio:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

Utilizzare ConfigMaps nei DAG

Questo esempio mostra come utilizzare ConfigMaps nei DAG.

Nell'esempio seguente, un oggetto ConfigMap viene passato nel parametro configmaps. Tutte le chiavi di questo ConfigMap sono disponibili come variabili di ambiente:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Il seguente esempio mostra come montare un ConfigMap come volume:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Informazioni sul fornitore Kubernetes CNCF

KubernetesPodOperator è implementato nel provider apache-airflow-providers-cncf-kubernetes.

Per le note di rilascio dettagliate del provider CNCF Kubernetes, consulta il sito web del provider CNCF Kubernetes.

Requisiti di risorse

Cloud Composer 3 supporta i seguenti valori per i requisiti delle risorse. Per un esempio di utilizzo dei requisiti delle risorse, vedi Configurazione aggiuntiva.

Risorsa Minimo Massimo Passaggio
CPU 0,25 32 Valori del passo: 0,25, 0,5, 1, 2, 4, 6, 8, 10, ..., 32. I valori richiesti vengono arrotondati al valore di passo supportato più vicino (ad esempio, da 5 a 6).
Memoria 2G (GB) 128G (GB) Valori passo: 2, 3, 4, 5, ..., 128. I valori richiesti vengono arrotondati al valore di passo supportato più vicino (ad esempio, 3,5 G a 4 G).
Archiviazione - 100G (GB) Qualsiasi valore. Se vengono richiesti più di 100 GB, ne vengono forniti solo 100 GB.

Per ulteriori informazioni sulle unità di risorse in Kubernetes, consulta Unità di risorse in Kubernetes.

Risoluzione dei problemi

Questa sezione fornisce consigli per la risoluzione dei problemi comuni di KubernetesPodOperator:

Visualizza i log

Quando risolvi i problemi, puoi controllare i log nel seguente ordine:

  1. Log delle attività Airflow:

    1. Nella console Google Cloud , vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.

    3. Vai alla scheda DAG.

    4. Fai clic sul nome del DAG, quindi fai clic sull'esecuzione del DAG per visualizzare i dettagli e i log.

  2. Log dello scheduler Airflow:

    1. Vai alla pagina Dettagli ambiente.

    2. Vai alla scheda Log.

    3. Ispeziona i log dello scheduler Airflow.

  3. Log dei workload utente:

    1. Vai alla pagina Dettagli ambiente.

    2. Vai alla scheda Monitoraggio.

    3. Seleziona Carichi di lavoro utente.

    4. Esamina l'elenco dei workload eseguiti. Puoi visualizzare i log e le informazioni sull'utilizzo delle risorse per ogni carico di lavoro.

Codici restituiti diversi da zero

Quando utilizzi KubernetesPodOperator (e GKEStartPodOperator), il codice restituito del punto di ingresso del container determina se l'attività è considerata riuscita o meno. I codici restituiti diversi da zero indicano un errore.

Un pattern comune è eseguire uno script shell come punto di ingresso del container per raggruppare più operazioni all'interno del container.

Se stai scrivendo uno script di questo tipo, ti consigliamo di includere il comando set -e nella parte superiore dello script in modo che i comandi non riusciti nello script terminino lo script e propaghino l'errore all'istanza dell'attività Airflow.

Timeout dei pod

Il timeout predefinito per KubernetesPodOperator è di 120 secondi, il che può comportare timeout prima del download di immagini più grandi. Puoi aumentare il timeout modificando il parametro startup_timeout_seconds quando crei KubernetesPodOperator.

Quando un pod va in timeout, il log specifico dell'attività è disponibile nell'interfaccia utente di Airflow. Ad esempio:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

I timeout dei pod possono verificarsi anche quando l'account di servizio Cloud Composer non dispone delle autorizzazioni IAM necessarie per eseguire l'attività in questione. Per verificarlo, esamina gli errori a livello di pod utilizzando le dashboard GKE per visualizzare i log del tuo carico di lavoro specifico oppure utilizza Cloud Logging.

Le attività KubernetesPodOperator non riescono quando viene eseguito un numero elevato di attività

Quando il tuo ambiente esegue un numero elevato di attività KubernetesPodOperator o KubernetesExecutor contemporaneamente, Cloud Composer 3 non accetta nuove attività finché alcune di quelle esistenti non sono terminate.

Per saperne di più sulla risoluzione di questo problema, vedi Risoluzione dei problemi relativi alle attività KubernetesExecutor.

Passaggi successivi