Use o KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página descreve como usar o KubernetesPodOperator para implementar pods do Kubernetes do Cloud Composer no cluster do Google Kubernetes Engine que faz parte do seu ambiente do Cloud Composer.

O KubernetesPodOperator inicia pods do Kubernetes no cluster do seu ambiente. Em comparação, os operadores do Google Kubernetes Engine executam pods do Kubernetes num cluster especificado, que pode ser um cluster separado não relacionado com o seu ambiente. Também pode criar e eliminar clusters através dos operadores do Google Kubernetes Engine.

O KubernetesPodOperator é uma boa opção se precisar de:

  • Dependências personalizadas do Python que não estão disponíveis através do repositório público PyPI.
  • Dependências binárias que não estão disponíveis na imagem de trabalho do Cloud Composer padrão.

Antes de começar

Consulte a seguinte lista de diferenças entre o KubernetesPodOperator no Cloud Composer 3 e no Cloud Composer 2 e certifique-se de que os seus DAGs são compatíveis:

  • Não é possível criar espaços de nomes personalizados no Cloud Composer 3. Os pods são sempre executados no espaço de nomes composer-user-workloads, mesmo que seja especificado um espaço de nomes diferente. Os pods neste espaço de nomes têm acesso aos recursos do seu projeto e à rede VPC (se ativada) sem configuração adicional.

  • Não é possível executar vários contentores adicionais sidecar no Cloud Composer 3. Pode executar um único contentor auxiliar se tiver o nome airflow-xcom-sidecar.

  • Não é possível criar Kubernetes Secrets e ConfigMaps através da API Kubernetes. Em alternativa, o Cloud Composer fornece comandos da CLI gcloud, recursos do Terraform e a API Cloud Composer para gerir segredos do Kubernetes e ConfigMaps. Para mais informações, consulte o artigo Use segredos e mapas de configuração do Kubernetes.

  • Não é possível implementar cargas de trabalho personalizadas no Cloud Composer 3. Apenas os segredos e os mapas de configuração do Kubernetes podem ser modificados, mas todas as outras alterações de configuração não são possíveis.

  • Os requisitos de recursos (CPU, memória e armazenamento) têm de ser especificados através de valores suportados.

  • Tal como no Cloud Composer 2, a configuração de afinidade de pods não está disponível. Se quiser usar a afinidade de pods, use os operadores do GKE para iniciar pods num cluster diferente.

Acerca do KubernetesPodOperator no Cloud Composer 3

Esta secção descreve como funciona o KubernetesPodOperator no Cloud Composer 3.

Utilização de recursos

No Cloud Composer 3, o cluster do seu ambiente é dimensionado automaticamente. As cargas de trabalho adicionais que executa com o KubernetesPodOperator são dimensionadas independentemente do seu ambiente. O seu ambiente não é afetado pelo aumento da procura de recursos, mas o cluster do seu ambiente é dimensionado para cima e para baixo consoante a procura de recursos.

Os preços das cargas de trabalho adicionais que executa no cluster do seu ambiente seguem o modelo de preços do Cloud Composer 3 e usam SKUs do Cloud Composer 3.

O Cloud Composer 3 usa clusters do Autopilot que introduzem a noção de classes de computação:

  • O Cloud Composer só suporta a classe de computação general-purpose.

  • Por predefinição, se não for selecionada nenhuma classe, a classe general-purpose é assumida quando cria pods com o KubernetesPodOperator.

  • Cada classe está associada a propriedades específicas e limites de recursos. Pode ler sobre elas na documentação do Autopilot. Por exemplo, os pods que são executados na classe general-purpose podem usar até 110 GiB de memória.

Acesso aos recursos do projeto

No Cloud Composer 3, o cluster do seu ambiente está localizado no projeto de inquilino e não é possível configurá-lo. Os pods são executados no cluster do ambiente, num espaço de nomes isolado.

No Cloud Composer 3, os pods são sempre executados no namespace composer-user-workloads, mesmo que seja especificado um namespace diferente. Os pods neste espaço de nomes podem aceder a Google Cloud recursos no seu projeto e na sua rede VPC (se estiver ativada) sem configuração adicional. A conta de serviço do seu ambiente é usada para aceder a estes recursos. Não é possível especificar uma conta de serviço diferente.

Configuração mínima

Para criar um KubernetesPodOperator, apenas são necessários os parâmetros name, image e task_id do Pod a usar. O /home/airflow/composer_kube_config contém credenciais para autenticar no GKE.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configuração adicional

Este exemplo mostra parâmetros adicionais que pode configurar no KubernetesPodOperator.

Consulte os seguintes recursos para mais informações:

  • Para ver informações sobre a utilização de segredos e mapas de configuração do Kubernetes, consulte o artigo Use segredos e mapas de configuração do Kubernetes.

  • Para obter informações sobre a utilização de modelos Jinja com o KubernetesPodOperator, consulte o artigo Utilize modelos Jinja.

  • Para ver informações sobre os valores suportados para requisitos de recursos (CPU, memória e armazenamento), consulte o artigo Requisitos de recursos.

  • Para obter informações sobre os parâmetros KubernetesPodOperator, consulte a referência do operador na documentação do Airflow.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Use modelos Jinja

O Airflow suporta modelos Jinja em DAGs.

Tem de declarar os parâmetros do Airflow necessários (task_id, name e image) com o operador. Conforme mostrado no exemplo seguinte, pode usar modelos para todos os outros parâmetros com o Jinja, incluindo cmds, arguments, env_vars e config_file.

O parâmetro env_vars no exemplo é definido a partir de uma variável do Airflow denominada my_value. O DAG de exemplo obtém o respetivo valor da variável de modelo vars no Airflow. O Airflow tem mais variáveis que dão acesso a diferentes tipos de informações. Por exemplo, pode usar a variável de modelo conf para aceder aos valores das opções de configuração do Airflow. Para mais informações e a lista de variáveis disponíveis no Airflow, consulte a referência de modelos na documentação do Airflow.

Sem alterar o DAG nem criar a variável env_vars, a tarefa ex-kube-templates no exemplo falha porque a variável não existe. Crie esta variável na IU do Airflow ou com a CLI Google Cloud:

IU do Airflow

  1. Aceda à IU do Airflow.

  2. Na barra de ferramentas, selecione Administração > Variáveis.

  3. Na página Variável de lista, clique em Adicionar um novo registo.

  4. Na página Adicionar variável, introduza as seguintes informações:

    • Tecla:my_value
    • Val: example_value
  5. Clique em Guardar.

gcloud

Introduza o seguinte comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Substituir:

  • ENVIRONMENT com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.

O exemplo seguinte demonstra como usar modelos Jinja com KubernetesPodOperator:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Use segredos e ConfigMaps do Kubernetes

Um segredo do Kubernetes é um objeto que contém dados confidenciais. Um ConfigMap do Kubernetes é um objeto que contém dados não confidenciais em pares de chave/valor.

No Cloud Composer 3, pode criar segredos e ConfigMaps através da CLI do Google Cloud, da API ou do Terraform e, em seguida, aceder aos mesmos a partir do KubernetesPodOperator:

  • Com a CLI e a API Google Cloud, fornece um ficheiro de configuração YAML.
  • Com o Terraform, define Secrets e ConfigMaps como recursos separados nos ficheiros de configuração do Terraform.

Acerca dos ficheiros de configuração YAML

Quando cria um segredo do Kubernetes ou um ConfigMap através da Google Cloud CLI e da API, fornece um ficheiro no formato YAML. Este ficheiro tem de seguir o mesmo formato usado pelos segredos e pelos mapas de configuração do Kubernetes. A documentação do Kubernetes fornece muitos exemplos de código de ConfigMaps e Secrets. Para começar, pode consultar a página Distribua credenciais de forma segura através de segredos e ConfigMaps.

Tal como nos segredos do Kubernetes, use a representação base64 quando definir valores nos segredos.

Para codificar um valor, pode usar o seguinte comando (esta é uma das muitas formas de obter um valor codificado em base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Saída:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Os dois exemplos de ficheiros YAML seguintes são usados em exemplos mais adiante neste guia. Exemplo de ficheiro de configuração YAML para um secret do Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Outro exemplo que demonstra como incluir ficheiros. Tal como no exemplo anterior, primeiro codifique o conteúdo de um ficheiro (cat ./key.json | base64) e, em seguida, faculte este valor no ficheiro YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Um exemplo de um ficheiro de configuração YAML para um ConfigMap. Não precisa de usar a representação base64 em ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Faça a gestão dos segredos do Kubernetes

gcloud

Crie um segredo

Para criar um segredo do Kubernetes, execute o seguinte comando:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.
  • SECRET_FILE: caminho para um ficheiro YAML local que contém a configuração do segredo.

Exemplo:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

Atualize um segredo

Para atualizar um segredo do Kubernetes, execute o seguinte comando. O nome do segredo é retirado do ficheiro YAML especificado e o conteúdo do segredo é substituído.

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.
  • SECRET_FILE: caminho para um ficheiro YAML local que contém a configuração do segredo. Especifique o nome do segredo no campo metadata > name neste ficheiro.

List Secrets

Para obter uma lista de segredos e respetivos campos para um ambiente, execute o seguinte comando. Os valores das chaves na saída são substituídos por asteriscos.

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.

Obtenha os detalhes do Secret

Para obter informações detalhadas sobre um segredo, execute o seguinte comando. Os valores das chaves na saída são substituídos por asteriscos.

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Substitua o seguinte:

  • SECRET_NAME: o nome do segredo, conforme foi definido no campo metadata > name no ficheiro YAML com a configuração do segredo.
  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.

Elimine um segredo

Para eliminar um segredo, execute o seguinte comando:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME: o nome do segredo, conforme foi definido no campo metadata > name no ficheiro YAML com a configuração do segredo.
  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.

API

Crie um segredo

  1. Crie um pedido environments.userWorkloadsSecrets.create API.

  2. Neste pedido:

    1. No corpo do pedido, no campo name, especifique o URI do novo segredo.
    2. No corpo do pedido, no campo data, especifique chaves e valores codificados em base64 para o segredo.

Exemplo:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

Atualize um segredo

  1. Crie um pedido environments.userWorkloadsSecrets.update API.

  2. Neste pedido:

    1. No corpo do pedido, no campo name, especifique o URI do segredo.
    2. No corpo do pedido, no campo data, especifique chaves e valores codificados em base64 para o segredo. Os valores são substituídos.

Exemplo:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

List Secrets

Crie um pedido da API environments.userWorkloadsSecrets.list. Os valores das chaves na saída são substituídos por asteriscos. É possível usar a paginação com este pedido. Consulte a referência do pedido para ver mais detalhes.

Exemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

Obtenha os detalhes do Secret

Crie um pedido da API environments.userWorkloadsSecrets.get. Os valores das chaves na saída são substituídos por asteriscos.

Exemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Elimine um segredo

Crie um pedido environments.userWorkloadsSecrets.delete API.

Exemplo:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

O recurso google_composer_user_workloads_secret define um secret do Kubernetes, com chaves e valores definidos no bloco data.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: o nome do recurso do ambiente, que contém a definição do ambiente no Terraform. O nome do ambiente real também é especificado neste recurso.
  • LOCATION: a região onde o ambiente está localizado.
  • SECRET_NAME: o nome do segredo.
  • KEY_NAME: uma ou mais chaves para este segredo.
  • KEY_VALUE: valor codificado em base64 para a chave. Pode usar a função base64encode para codificar o valor (consulte o exemplo).

Os dois exemplos seguintes de segredos do Kubernetes são usados em exemplos mais adiante neste guia.

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

Outro exemplo que demonstra como incluir ficheiros. Pode usar a função file para ler o conteúdo do ficheiro como uma string e, em seguida, codificá-lo em base64:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

Use segredos do Kubernetes nos seus DAGs

Este exemplo mostra duas formas de usar segredos do Kubernetes: como uma variável de ambiente e como um volume montado pelo pod.

O primeiro secret, airflow-secrets, está definido como uma variável de ambiente do Kubernetes denominada SQL_CONN (em oposição a uma variável de ambiente do Airflow ou do Cloud Composer).

O segundo segredo, service-account, monta service-account.json, um ficheiro com um token de conta de serviço, em /var/secrets/google.

Veja o aspeto dos objetos secretos:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

O nome do primeiro segredo do Kubernetes é definido na variável secret_env. Este Secret tem o nome airflow-secrets. O parâmetro deploy_type especifica que tem de ser exposto como uma variável de ambiente. O nome da variável de ambiente é SQL_CONN, conforme especificado no parâmetro deploy_target. Por último, o valor da variável de ambiente SQL_CONN é definido para o valor da chave sql_alchemy_conn.

O nome do segundo secret do Kubernetes é definido na variável secret_volume. Este Secret tem o nome service-account. É exposto como um volume, conforme especificado no parâmetro deploy_type. O caminho do ficheiro a montar, deploy_target, é /var/secrets/google. Por fim, o key do segredo armazenado no deploy_target é service-account.json.

Veja o aspeto da configuração do operador:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Faça a gestão dos ConfigMaps do Kubernetes

gcloud

Crie um ConfigMap

Para criar um ConfigMap, execute o seguinte comando:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.
  • CONFIG_MAP_FILE: caminho para um ficheiro YAML local que contém a configuração do ConfigMap.

Exemplo:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

Atualize um ConfigMap

Para atualizar um ConfigMap, execute o seguinte comando. O nome do ConfigMap é retirado do ficheiro YAML especificado e o conteúdo do ConfigMap é substituído.

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.
  • CONFIG_MAP_FILE: caminho para um ficheiro YAML local que contém a configuração do ConfigMap. Especifique o nome do ConfigMap no campo metadata > name neste ficheiro.

List ConfigMaps

Para obter uma lista de ConfigMaps e respetivos campos para um ambiente, execute o seguinte comando. Os valores-chave na saída são apresentados tal como estão.

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Substitua o seguinte:

  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.

Obtenha os detalhes do ConfigMap

Para obter informações detalhadas sobre um ConfigMap, execute o seguinte comando. Os valores das chaves no resultado são apresentados tal como estão.

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

Substitua o seguinte:

  • CONFIG_MAP_NAME: o nome do ConfigMap, conforme foi definido no campo metadata > name no ficheiro YAML com a configuração do ConfigMap.
  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.

Elimine um ConfigMap

Para eliminar um ConfigMap, execute o seguinte comando:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME: o nome do ConfigMap, conforme foi definido no campo metadata > name no ficheiro YAML com a configuração do ConfigMap.
  • ENVIRONMENT_NAME: o nome do seu ambiente.
  • LOCATION: a região onde o ambiente está localizado.

API

Crie um ConfigMap

  1. Crie um pedido de API.environments.userWorkloadsConfigMaps.create

  2. Neste pedido:

    1. No corpo do pedido, no campo name, especifique o URI do novo ConfigMap.
    2. No corpo do pedido, no campo data, especifique chaves e valores para o ConfigMap.

Exemplo:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

Atualize um ConfigMap

  1. Crie um pedido de API.environments.userWorkloadsConfigMaps.update

  2. Neste pedido:

    1. No corpo do pedido, no campo name, especifique o URI do ConfigMap.
    2. No corpo do pedido, no campo data, especifique chaves e valores para o ConfigMap. Os valores são substituídos.

Exemplo:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

List ConfigMaps

Crie um pedido environments.userWorkloadsConfigMaps.list API. As chaves-valores no resultado são apresentadas tal como estão. É possível usar a paginação com este pedido. Consulte a referência do pedido para ver mais detalhes.

Exemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

Obtenha os detalhes do ConfigMap

Crie um pedido environments.userWorkloadsConfigMaps.get API. Os valores-chave na saída são apresentados tal como estão.

Exemplo:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Elimine um ConfigMap

Crie um pedido environments.userWorkloadsConfigMaps.delete API.

Exemplo:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

O recurso google_composer_user_workloads_config_map define um ConfigMap, com chaves e valores definidos no bloco data.

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME: o nome do recurso do ambiente, que contém a definição do ambiente no Terraform. O nome do ambiente real também é especificado neste recurso.
  • LOCATION: a região onde o ambiente está localizado.
  • CONFIG_MAP_NAME: o nome do ConfigMap.
  • KEY_NAME: uma ou mais chaves para este ConfigMap.
  • KEY_VALUE: valor da chave.

Exemplo:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

Use ConfigMaps nos seus DAGs

Este exemplo mostra como usar ConfigMaps nos seus DAGs.

No exemplo seguinte, é transmitido um ConfigMap no parâmetro configmaps. Todas as chaves deste ConfigMap estão disponíveis como variáveis de ambiente:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

O exemplo seguinte mostra como montar um ConfigMap como um volume:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

Informações sobre o fornecedor do Kubernetes da CNCF

O KubernetesPodOperator é implementado no fornecedor apache-airflow-providers-cncf-kubernetes.

Para ver notas de lançamento detalhadas do fornecedor do CNCF Kubernetes, consulte o Website do fornecedor do CNCF Kubernetes.

Requisitos de recursos

O Cloud Composer 3 suporta os seguintes valores para os requisitos de recursos. Para ver um exemplo de utilização dos requisitos de recursos, consulte a secção Configuração adicional.

Recurso Mínimo Máximo Passo
CPU 0,25 32 Valores de passo: 0,25, 0,5, 1, 2, 4, 6, 8, 10, …, 32. Os valores pedidos são arredondados para cima para o valor de passo suportado mais próximo (por exemplo, 5 para 6).
Memória 2G (GB) 128 GB Valores de passo: 2, 3, 4, 5, …, 128. Os valores pedidos são arredondados para o valor de passo suportado mais próximo (por exemplo, 3,5G para 4G).
Armazenamento - 100G (GB) Qualquer valor. Se forem pedidos mais de 100 GB, só são disponibilizados 100 GB.

Para mais informações sobre as unidades de recursos no Kubernetes, consulte o artigo Unidades de recursos no Kubernetes.

Resolução de problemas

Esta secção oferece conselhos para resolver problemas comuns do KubernetesPodOperator:

Ver registos

Ao resolver problemas, pode verificar os registos pela seguinte ordem:

  1. Registos de tarefas do Airflow:

    1. Na Google Cloud consola, aceda à página Ambientes.

      Aceder a Ambientes

    2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

    3. Aceda ao separador DAGs.

    4. Clique no nome do DAG e, de seguida, clique na execução do DAG para ver os detalhes e os registos.

  2. Registos do programador do Airflow:

    1. Aceda à página Detalhes do ambiente.

    2. Aceda ao separador Registos.

    3. Inspecione os registos do programador do Airflow.

  3. Registos de cargas de trabalho do utilizador:

    1. Aceda à página Detalhes do ambiente.

    2. Aceda ao separador Monitorização.

    3. Selecione User Workloads.

    4. Inspecione a lista de cargas de trabalho executadas. Pode ver os registos e as informações de utilização de recursos para cada carga de trabalho.

Códigos de retorno diferentes de zero

Quando usar o KubernetesPodOperator (e o GKEStartPodOperator), o código de retorno do ponto de entrada do contentor determina se a tarefa é considerada bem-sucedida ou não. Os códigos de retorno diferentes de zero indicam uma falha.

Um padrão comum é executar um script de shell como o ponto de entrada do contentor para agrupar várias operações no contentor.

Se estiver a escrever um script deste tipo, recomendamos que inclua o comando set -e na parte superior do script para que os comandos com falhas no script terminem o script e propaguem a falha para a instância da tarefa do Airflow.

Limites de tempo de agrupamentos

O limite de tempo predefinido para KubernetesPodOperator é de 120 segundos, o que pode resultar em limites de tempo que ocorrem antes da transferência de imagens maiores. Pode aumentar o tempo limite alterando o parâmetro startup_timeout_seconds quando cria o KubernetesPodOperator.

Quando um pod excede o limite de tempo, o registo específico da tarefa está disponível na IU do Airflow. Por exemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Os tempos limite dos pods também podem ocorrer quando a conta de serviço do Cloud Composer não tem as autorizações do IAM necessárias para realizar a tarefa em questão. Para verificar isto, consulte os erros ao nível do pod através dos painéis de controlo do GKE para ver os registos da sua carga de trabalho específica ou use o Cloud Logging.

As tarefas KubernetesPodOperator falham quando é executado um grande número de tarefas

Quando o seu ambiente executa um grande número de tarefas KubernetesPodOperator ou KubernetesExecutor em simultâneo, o Cloud Composer 3 não aceita novas tarefas até que algumas das tarefas existentes estejam concluídas.

Para mais informações sobre a resolução de problemas, consulte o artigo Resolva problemas de tarefas do KubernetesExecutor.

O que se segue?