Utilizzo di KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina descrive come utilizzare KubernetesPodOperator per eseguire il deployment di pod Kubernetes da Cloud Composer nel cluster Google Kubernetes Engine che fa parte del tuo ambiente Cloud Composer.

KubernetesPodOperator avvia pod Kubernetes nel cluster del tuo ambiente. In confronto, gli operatori Google Kubernetes Engine eseguono pod Kubernetes in un cluster specificato, che può essere un cluster separato non correlato al tuo ambiente. Puoi anche creare ed eliminare i cluster utilizzando gli operatori Google Kubernetes Engine.

KubernetesPodOperator è una buona opzione se hai bisogno di:

  • Dipendenze Python personalizzate non disponibili tramite il repository pubblico PyPI.
  • Dipendenze binarie non disponibili nell'immagine worker Cloud Composer di serie.

Prima di iniziare

Controlla il seguente elenco di differenze tra KubernetesPodOperator in Cloud Composer 3 e Cloud Composer 2 e assicurati che i tuoi DAG siano compatibili:

  • Non è possibile creare spazi dei nomi personalizzati in Cloud Composer 3. I pod vengono sempre eseguiti nello spazio dei nomi composer-user-workloads, anche se viene specificato uno spazio dei nomi diverso. I pod in questo spazio dei nomi hanno accesso alle risorse e alla rete VPC (se abilitata) del progetto senza configurazione aggiuntiva.

  • Non è possibile creare secret e ConfigMap Kubernetes utilizzando l'API Kubernetes. Cloud Composer fornisce invece comandi Google Cloud CLI, risorse Terraform e l'API Cloud Composer per gestire i secret e i ConfigMap di Kubernetes. Per maggiori informazioni, consulta Utilizzare secret e ConfigMap di Kubernetes.

  • Non è possibile eseguire il deployment di carichi di lavoro personalizzati in Cloud Composer 3. Solo i secret e i ConfigMap di Kubernetes possono essere modificati, ma tutte le altre modifiche alla configurazione non sono possibili.

  • I requisiti delle risorse (CPU, memoria e spazio di archiviazione) devono essere specificati utilizzando valori supportati.

  • Come in Cloud Composer 2, la configurazione dell'affinità dei pod non è disponibile. Se vuoi utilizzare l'affinità dei pod, utilizza gli operatori GKE per lanciarli in un altro cluster.

Informazioni su KubernetesPodOperator in Cloud Composer 3

Questa sezione descrive il funzionamento di KubernetesPodOperator in Cloud Composer 3.

Utilizzo delle risorse

In Cloud Composer 3, il cluster del tuo ambiente viene scalato automaticamente. I carichi di lavoro aggiuntivi eseguiti con KubernetesPodOperator si scalano indipendentemente dal tuo ambiente. Il tuo ambiente non è interessato dall'aumento della domanda di risorse, ma il cluster del tuo ambiente aumenta e diminuisce in base alla domanda di risorse.

I prezzi dei carichi di lavoro aggiuntivi eseguiti nel cluster del tuo ambienteseguono il modello di prezzo di Cloud Composer 3 e utilizzano gli SKU di Cloud Composer 3.

Cloud Composer 3 utilizza i cluster Autopilot che introducono il concetto di classi di calcolo:

  • Cloud Composer supporta solo la classe di calcolo general-purpose.

  • Per impostazione predefinita, se non è selezionata alcuna classe, viene presupposta la classe general-purpose quando crei i pod utilizzando KubernetesPodOperator.

  • A ogni classe sono associati limiti di risorse e proprietà specifiche. Puoi trovare informazioni al riguardo nella documentazione di Autopilot. Ad esempio, i pod che vengono eseguiti all'interno della classe general-purpose possono utilizzare fino a 110 GiB di memoria.

Accesso alle risorse del progetto

In Cloud Composer 3, il cluster dell'ambiente si trova nel progetto del tenant, i pod vengono eseguiti nel cluster dell'ambiente, in uno spazio dei nomi isolato.

In Cloud Composer 3, i pod vengono sempre eseguiti nello spazio dei nomi composer-user-workloads anche se ne viene specificato uno diverso. I pod in questo spazio dei nomi possono accedere Google Cloud alle risorse del progetto e alla rete VPC (se attivata) senza configurazione aggiuntiva. Per accedere a queste risorse viene utilizzato l'account di servizio del tuo ambiente. Non è possibile specificare un altro account di servizio.

Configurazione minima

Per creare un KubernetesPodOperator, sono obbligatori solo i parametri name, image da utilizzare e task_id del pod. Il file /home/airflow/composer_kube_config contiene le credenziali per l'autenticazione in GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configurazione aggiuntiva

Questo esempio mostra i parametri aggiuntivi che puoi configurare in KubernetesPodOperator.

Per saperne di più, consulta le seguenti risorse:

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Utilizzare i modelli Jinja

Airflow supporta i modelli Jinja nei DAG.

Devi dichiarare i parametri Airflow richiesti (task_id, name e image) all'operatore. Come mostrato nell'esempio seguente, puoi creare un modello per tutti gli altri parametri con Jinja, inclusi cmds, arguments, env_vars e config_file.

Il parametro env_vars nell'esempio viene impostato da una variabile Airflow denominata my_value. Il DAG di esempio ricava il proprio valore dalla variabile del modello vars in Airflow. Airflow ha più variabili che forniscono l'accesso a diversi tipi di informazioni. Ad esempio, puoi utilizzare la variabile del modello conf per accedere ai valori delle opzioni di configurazione di Airflow. Per ulteriori informazioni e l'elenco delle variabili disponibili in Airflow, consulta la sezione Riferimento ai modelli nella documentazione di Airflow.

Se non modifichi il DAG o non crei la variabile env_vars, l'attività ex-kube-templates nell'esempio non va a buon fine perché la variabile non esiste. Crea questa variabile nell'interfaccia utente di Airflow o con Google Cloud CLI:

UI di Airflow

  1. Vai all'interfaccia utente di Airflow.

  2. Nella barra degli strumenti, seleziona Amministrazione > Voci.

  3. Nella pagina List Variable (Variabile elenco), fai clic su Add a new record (Aggiungi un nuovo record).

  4. Nella pagina Aggiungi variabile, inserisci le seguenti informazioni:

    • Chiave:my_value
    • Val: example_value
  5. Fai clic su Salva.

gcloud

Inserisci questo comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Sostituisci:

  • ENVIRONMENT con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.

L'esempio seguente mostra come utilizzare i modelli Jinja con KubernetesPodOperator:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Utilizzare secret e ConfigMap di Kubernetes

Un secret Kubernetes è un oggetto che contiene dati sensibili. Un ConfigMap Kubernetes è un oggetto che contiene dati non riservati in coppie chiave/valore.

In Cloud Composer 3, puoi creare secret e ConfigMap utilizzando Google Cloud CLI, l'API o Terraform, quindi accedervi da KubernetesPodOperator:

  • Con Google Cloud CLI e API, fornisci un file di configurazione YAML.
  • Con Terraform, definisci i secret e i ConfigMap come risorse separate nei file di configurazione Terraform.

Informazioni sui file di configurazione YAML

Quando crei un secret o un ConfigMap Kubernetes utilizzando l'API e la CLI Google Cloud, fornisci un file in formato YAML. Questo file deve avere lo stesso formato utilizzato da Kubernetes Secret e ConfigMap. La documentazione di Kubernetes fornisce molti esempi di codice di ConfigMap e secret. Per iniziare, consulta la pagina Distribuzione sicura delle credenziali tramite Secret e ConfigMaps.

Come in Kubernetes Secrets, utilizza la rappresentazione base64 quando definisci i valori nei secret.

Per codificare un valore, puoi utilizzare il seguente comando (questo è uno dei molti modi per ottenere un valore con codifica base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Output:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

I due esempi di file YAML riportati di seguito vengono utilizzati negli esempi più avanti in questa guida. Esempio di file di configurazione YAML per un segreto Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Un altro esempio che mostra come includere i file. Come nell'esempio precedente, codifica prima i contenuti di un file (cat ./key.json | base64), quindi fornisci questo valore nel file YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Un esempio di file di configurazione YAML per un ConfigMap. Non è necessario utilizzare la rappresentazione base64 nei ConfigMap:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Gestire i secret di Kubernetes

gcloud

Creare un secret

Per creare un secret di Kubernetes, esegui il seguente comando:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • SECRET_FILE: percorso di un file YAML locale contenente la configurazione del segreto.

Esempio:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Aggiornare un secret

Per aggiornare un secret di Kubernetes, esegui il comando seguente. Il nome del segreto verrà preso dal file YAML specificato e i contenuti del segreto verranno sostituiti.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • SECRET_FILE: percorso di un file YAML locale contenente la configurazione del segreto. Specifica il nome del segreto nel campo metadata > name di questo file.

Elenca secret

Per ottenere un elenco di secret e dei relativi campi per un ambiente, esegui il seguente comando. I valori chiave nell'output verranno sostituiti con asterischi.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

Visualizzare i dettagli del secret

Per ottenere informazioni dettagliate su un segreto, esegui il seguente comando. I valori chiave nell'output verranno sostituiti con asterischi.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Sostituisci quanto segue:

  • SECRET_NAME: il nome del segreto, come definito nel campo metadata > name del file YAML con la configurazione del segreto.
  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

Eliminare un secret

Per eliminare un segreto, esegui il seguente comando:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: il nome del segreto, come definito nel campo metadata > name del file YAML con la configurazione del segreto.
  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

API

Creare un secret

  1. Crea una richiesta dell'API environments.userWorkloadsSecrets.create.

  2. In questa richiesta:

    1. Nel corpo della richiesta, nel campo name, specifica l'URI del nuovo secret.
    2. Nel corpo della richiesta, nel campo data, specifica le chiavi e i valori con codifica Base64 per il secret.

Esempio:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Aggiornare un secret

  1. Crea una richiesta dell'API environments.userWorkloadsSecrets.update.

  2. In questa richiesta:

    1. Nel corpo della richiesta, nel campo name, specifica l'URI del segreto.
    2. Nel corpo della richiesta, nel campo data, specifica le chiavi e i valori con codifica Base64 per il secret. I valori verranno sostituiti.

Esempio:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

Elenca secret

Crea una richiesta di API environments.userWorkloadsSecrets.list. I valori chiave nell'output verranno sostituiti con asterischi. È possibile utilizzare la paginazione con questa richiesta. Per maggiori dettagli, consulta il riferimento della richiesta.

Esempio:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Visualizzare i dettagli del secret

Crea una richiesta di API environments.userWorkloadsSecrets.get. I valori chiave nell'output verranno sostituiti con asterischi.

Esempio:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Eliminare un secret

Crea una richiesta dell'API environments.userWorkloadsSecrets.delete.

Esempio:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

La risorsa google_composer_user_workloads_secret definisce un secret Kubernetes, con chiavi e valori definiti nel blocco data.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: il nome della risorsa dell'ambiente, che contiene la definizione dell'ambiente in Terraform. Il nome dell'ambiente reale è specificato anche in questa risorsa.
  • LOCATION: la regione in cui si trova l'ambiente.
  • SECRET_NAME: il nome del secret.
  • KEY_NAME: una o più chiavi per questo segreto.
  • KEY_VALUE: valore codificato in base64 per la chiave. Puoi utilizzare la funzione base64encode per codificare il valore (vedi l'esempio).

I seguenti due esempi di secret Kubernetes vengono utilizzati negli esempi riportati più avanti in questa guida.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Un altro esempio che mostra come includere i file. Puoi utilizzare la funzione file per leggere i contenuti del file come stringa e poi codificarli in base64:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Utilizzare i secret Kubernetes nei DAG

Questo esempio mostra due modi per utilizzare i secret di Kubernetes: come variabile di ambiente e come volume montato dal pod.

Il primo segreto, airflow-secrets, è impostato su una variabile di ambiente Kubernetes denominata SQL_CONN (anziché su una variabile di ambiente Airflow o Cloud Composer).

Il secondo secret, service-account, monta service-account.json, un file con un token dell'account di servizio, su /var/secrets/google.

Ecco come sono gli oggetti Secret:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Il nome del primo secret di Kubernetes è definito nella variabile secret_env. Questo secret si chiama airflow-secrets. Il parametro deploy_type specifica che deve essere esposto come variabile di ambiente. Il nome della variabile di ambiente è SQL_CONN, come specificato nel parametro deploy_target. Infine, il valore della variabile di ambiente SQL_CONN viene impostato sul valore della chiave sql_alchemy_conn.

Il nome del secondo secret di Kubernetes è definito nella variabile secret_volume. Questo secret si chiama service-account. Viene visualizzato come volume, come specificato nel parametro deploy_type. Il percorso del file da montare, deploy_target, è /var/secrets/google. Infine, il key del segreto memorizzato nel deploy_target è service-account.json.

Ecco come si presenta la configurazione dell'operatore:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Gestire i ConfigMap di Kubernetes

gcloud

Creare un ConfigMap

Per creare un ConfigMap, esegui il comando seguente:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • CONFIG_MAP_FILE: percorso di un file YAML locale contenente la configurazione del ConfigMap.

Esempio:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

Aggiornare un ConfigMap

Per aggiornare un ConfigMap, esegui il comando seguente. Il nome dei ConfigMap verrà preso dal file YAML specificato e i contenuti del ConfigMap verranno sostituiti.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.
  • CONFIG_MAP_FILE: percorso di un file YAML locale contenente la configurazione del ConfigMap. Specifica il nome del ConfigMap nel campo metadata > name di questo file.

Elenca i ConfigMap

Per ottenere un elenco di ConfigMap e dei relativi campi per un ambiente, esegui il seguente comando. I valori chiave nell'output verranno visualizzati così come sono.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Sostituisci quanto segue:

  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

Ottieni i dettagli del ConfigMap

Per ottenere informazioni dettagliate su un ConfigMap, esegui il seguente comando. I valori chiave nell'output verranno visualizzati così come sono.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Sostituisci quanto segue:

  • CONFIG_MAP_NAME: il nome del ConfigMap, come definito nel campo metadata > name del file YAML con la configurazione del ConfigMap.
  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

Eliminare un ConfigMap

Per eliminare un ConfigMap, esegui il seguente comando:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: il nome del ConfigMap, come definito nel campo metadata > name del file YAML con la configurazione del ConfigMap.
  • ENVIRONMENT_NAME: il nome del tuo ambiente.
  • LOCATION: la regione in cui si trova l'ambiente.

API

Creare un ConfigMap

  1. Crea una richiesta API environments.userWorkloadsConfigMaps.create.

  2. In questa richiesta:

    1. Nel corpo della richiesta, nel campo name, specifica l'URI del nuovo ConfigMap.
    2. Nel corpo della richiesta, nel campo data, specifica le chiavi e i valori per il ConfigMap.

Esempio:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

Aggiornare un ConfigMap

  1. Crea una richiesta API environments.userWorkloadsConfigMaps.update.

  2. In questa richiesta:

    1. Nel corpo della richiesta, nel campo name, specifica l'URI del ConfigMap.
    2. Nel corpo della richiesta, nel campo data, specifica le chiavi e i valori per il ConfigMap. I valori verranno sostituiti.

Esempio:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

Elenca i ConfigMap

Crea una richiesta dell'API environments.userWorkloadsConfigMaps.list. I valori chiave nell'output verranno visualizzati così come sono. È possibile utilizzare la paginazione con questa richiesta. Per maggiori dettagli, consulta il riferimento della richiesta.

Esempio:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Ottieni i dettagli del ConfigMap

Crea una richiesta dell'API environments.userWorkloadsConfigMaps.get. I valori chiave nell'output verranno visualizzati così come sono.

Esempio:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Eliminare un ConfigMap

Crea una richiesta dell'API environments.userWorkloadsConfigMaps.delete.

Esempio:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

La risorsa google_composer_user_workloads_config_map definisce un ConfigMap, con chiavi e valori definiti nel blocco data.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: il nome della risorsa dell'ambiente, che contiene la definizione dell'ambiente in Terraform. Il nome dell'ambiente reale è specificato anche in questa risorsa.
  • LOCATION: la regione in cui si trova l'ambiente.
  • CONFIG_MAP_NAME: il nome del ConfigMap.
  • KEY_NAME: una o più chiavi per questo ConfigMap.
  • KEY_VALUE: valore per la chiave.

Esempio:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

Utilizzare i ConfigMap nei DAG

Questo esempio mostra come utilizzare i ConfigMap nei DAG.

Nel seguente esempio, un ConfigMap viene passato nel parametro configmaps. Tutte le chiavi di questo ConfigMap sono disponibili come variabili di ambiente:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

L'esempio seguente mostra come montare un ConfigMap come volume:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Informazioni sul fornitore Kubernetes CNCF

KubernetesPodOperator è implementato nel fornitore apache-airflow-providers-cncf-kubernetes.

Per le note di rilascio dettagliate del provider Kubernetes CNCF, consulta il sito web del provider Kubernetes CNCF.

Requisiti delle risorse

Cloud Composer 3 supporta i seguenti valori per i requisiti delle risorse. Per un esempio di utilizzo dei requisiti delle risorse, consulta Configurazione aggiuntiva.

Risorsa Minimo Massimo Passaggio
CPU 0,25 32 Valori del passo: 0,25, 0,5, 1, 2, 4, 6, 8, 10, ..., 32. I valori richiesti vengono arrotondati per eccesso al valore del passaggio supportato più vicino (ad es. da 5 a 6).
Memoria 2G (GB) 128G (GB) Valori del passo: 2, 3, 4, 5, ..., 128. I valori richiesti vengono arrotondati per eccesso al valore del passaggio supportato più vicino (ad esempio, da 3,5 G a 4G).
Archiviazione - 100G (GB) Qualsiasi valore. Se vengono richiesti più di 100 GB, vengono forniti solo 100 GB.

Per ulteriori informazioni sulle unità di risorse in Kubernetes, consulta Unità di risorse in Kubernetes.

Risoluzione dei problemi

Questa sezione fornisce consigli per la risoluzione dei problemi comuni di KubernetesPodOperator:

Visualizza i log

Per risolvere i problemi, puoi controllare i log nel seguente ordine:

  1. Log delle attività Airflow:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

    3. Vai alla scheda DAG.

    4. Fai clic sul nome del DAG, quindi sulla relativa esecuzione per visualizzarne i dettagli e i log.

  2. Log dello scheduler Airflow:

    1. Vai alla pagina Dettagli dell'ambiente.

    2. Vai alla scheda Log.

    3. Controlla i log dello scheduler di Airflow.

  3. Log di Workload utente:

    1. Vai alla pagina Dettagli dell'ambiente.

    2. Vai alla scheda Monitoraggio.

    3. Seleziona Carichi di lavoro utente.

    4. Controlla l'elenco dei workload eseguiti. Puoi visualizzare i log e le informazioni sull'utilizzo delle risorse per ogni carico di lavoro.

Codici di ritorno diversi da zero

Quando utilizzi KubernetesPodOperator (e GKEStartPodOperator), il codice di ritorno del punto di contatto del contenitore determina se l'attività è considerata riuscita o meno. I codici di ritorno diversi da zero indicano un errore.

Un pattern comune è eseguire uno script shell come punto di contatto del contenitore per riunire più operazioni al suo interno.

Se stai scrivendo uno script di questo tipo, ti consigliamo di includere il comando set -e nella parte superiore dello script in modo che i comandi non riusciti nello script interrompano lo script e propaghino l'errore all'istanza dell'attività Airflow.

Timeout dei pod

Il timeout predefinito per KubernetesPodOperator è 120 secondi, il che può comportare timeout prima del download di immagini di dimensioni maggiori. Puoi aumentare il timeout modificando il parametro startup_timeout_seconds quando crei KubernetesPodOperator.

Quando un pod scade, il log specifico dell'attività è disponibile nell'interfaccia utente di Airflow. Ad esempio:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

I timeout dei pod possono verificarsi anche quando l'account di servizio Cloud Composer manca delle autorizzazioni IAM necessarie per eseguire l'attività in questione. Per verificare, controlla gli errori a livello di pod utilizzando le dashboard GKE per esaminare i log relativi al tuo determinato carico di lavoro oppure utilizza Cloud Logging.

Le attività KubernetesPodOperator non riescono quando viene eseguito un numero elevato di attività

Quando l'ambiente esegue contemporaneamente un numero elevato di attività KubernetesPodOperator o KubernetesExecutor, Cloud Composer 3 non accetta nuove attività finché alcune di quelle esistenti non sono state completate.

Per ulteriori informazioni sulla risoluzione di questo problema, consulta Risoluzione dei problemi relativi alle attività KubernetesExecutor.

Passaggi successivi