Use o KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2

Nesta página, descrevemos como usar o KubernetesPodOperator para implantar pods do Kubernetes do Cloud Composer no cluster do Google Kubernetes Engine, que faz parte do ambiente do Cloud Composer, e para garantir que ele tenha os recursos apropriados.

KubernetesPodOperator inicia pods do Kubernetes no cluster do ambiente. Os operadores do Google Kubernetes Engine executam pods do Kubernetes em um cluster especificado, que pode ser um cluster separado não relacionado ao seu ambiente. Também é possível criar e excluir clusters usando os operadores do Google Kubernetes Engine.

O KubernetesPodOperator é uma boa opção, se você precisar de:

  • Dependências personalizadas do Python que não estão disponíveis no repositório PyPI público.
  • Dependências binárias que não estão disponíveis na imagem do worker do Cloud Composer.

Nesta página, você verá um exemplo de DAG do Airflow que inclui as seguintes configurações de KubernetesPodOperator:

Antes de começar

  • No Cloud Composer 2, o cluster do ambiente é escalonado automaticamente. As cargas de trabalho extras que você executa usando KubernetesPodOperator são escalonadas independentemente do ambiente. Seu ambiente não é afetado pelo aumento da demanda de recursos, mas o cluster do ambiente aumenta ou diminui dependendo da demanda de recursos. Os preços das cargas de trabalho extras executadas no cluster do ambiente seguem o modelo de preços do Cloud Composer 2 e usam as SKUs do Cloud Composer Compute.

  • Os clusters do Cloud Composer 2 usam a Identidade da carga de trabalho. Por padrão, os pods executados em namespaces recém-criados ou o namespace composer-user-workloads não podem acessar os recursos do Google Cloud. Ao usar a Identidade da carga de trabalho, as contas de serviço do Kubernetes associadas a namespaces precisam ser mapeadas para contas de serviço do Google Cloud para permitir a autorização da identidade do serviço para solicitações às APIs do Google e outros serviços.

    Por isso, se você executar pods no namespace composer-user-workloads ou em um namespace recém-criado no cluster do ambiente, as vinculações adequadas do IAM entre as contas de serviço do Kubernetes e do Google Cloud não serão criadas e esses pods não poderão acessar os recursos do projeto do Google Cloud.

    Se você quiser que seus pods tenham acesso aos recursos do Google Cloud, use o namespace composer-user-workloads ou crie seu próprio namespace, conforme descrito mais adiante.

    Para fornecer acesso aos recursos do seu projeto, siga as orientações na Identidade da carga de trabalho e configure as vinculações:

    1. Crie um namespace separado no cluster do ambiente.
    2. Crie uma vinculação entre a conta de serviço do Kubernetes composer-user-workloads/<namespace_name> e a conta de serviço do ambiente.
    3. Adicione a anotação da conta de serviço do seu ambiente à conta de serviço do Kubernetes.
    4. Ao usar KubernetesPodOperator, especifique o namespace e a conta de serviço do Kubernetes nos parâmetros namespace e service_account_name.
  • Se a versão 5.0.0 do provedor de Kubernetes CNCF for usada, siga as instruções da seção sobre o provedor de Kubernetes CNCF.

  • O Cloud Composer 2 usa clusters do GKE com a Identidade da carga de trabalho. O servidor de metadados do GKE leva alguns segundos para começar a aceitar solicitações em um pod recém-criado. Portanto, as tentativas de autenticação usando a Identidade da carga de trabalho podem falhar nos primeiros segundos da vida útil de um pod. Clique neste link para saber mais sobre essa limitação.

  • O Cloud Composer 2 usa clusters do Autopilot que apresentam a noção de classes de computação. Por padrão, se nenhuma classe for selecionada, a classe general-purpose será presumida quando você criar pods usando KubernetesPodOperator.

    • Cada classe está associada a propriedades e limites de recursos específicos. Leia mais sobre isso na documentação do Autopilot. Por exemplo, os pods executados na classe general-purpose podem usar até 110 GiB de memória.

Configuração do KubernetesPodOperator

Para acompanhar este exemplo, coloque todo o arquivo kubernetes_pod_operator.py na pasta dags/ do ambiente ou adicione o código relevante KubernetesPodOperator a um DAG.

As seções a seguir explicam cada configuração KubernetesPodOperator no exemplo. Para mais informações sobre cada variável de configuração, consulte a referência do Airflow.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Configuração mínima

Para criar um KubernetesPodOperator, são necessários apenas o name, o namespace em que o pod é executado, o image para usar e o task_id do pod.

Quando você colocar este snippet de código em um DAG, a configuração usará os padrões em /home/airflow/composer_kube_config. Não é necessário modificar o código para que a tarefa pod-ex-minimum seja bem-sucedida.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Configuração do modelo

O Airflow é compatível com o modelo Jinja. Você precisa declarar as variáveis necessárias (task_id, name, namespace e image) com o operador. Como mostra o exemplo a seguir, é possível criar modelos de todos os outros parâmetros com o Jinja, incluindo cmds, arguments, env_vars e config_file.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Sem alterar o DAG ou ambiente, a tarefa ex-kube-templates falha devido a dois erros. Os registros mostram que a tarefa está falhando porque a variável apropriada não existe (my_value). O segundo erro, que você pode receber após corrigir o primeiro erro, mostra que a tarefa falha porque core/kube_config não é encontrado em config.

Para corrigir os dois erros, siga as etapas descritas mais detalhadamente.

Para definir my_value com gcloud ou a IU do Airflow:

IU do Airflow

Na IU do Airflow 2:

  1. Acesse a IU do Airflow.

  2. Na barra de ferramentas, selecione Administrador > Variáveis.

  3. Na página Listar variáveis, clique em Adicionar um novo registro.

  4. Na página Adicionar variável, digite as seguintes informações:

    • Chave: my_value
    • Val: example_value
  5. Clique em Salvar.

gcloud

No Airflow 2, digite o seguinte comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Substitua:

  • ENVIRONMENT pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;

Para se referir a um config_file personalizado (um arquivo de configuração do Kubernetes), modifique a opção de configuração kube_config do Airflow para uma configuração válida do Kubernetes:

Seção Chave Valor
core kube_config /home/airflow/composer_kube_config

Aguarde alguns minutos para que o ambiente seja atualizado. Em seguida, execute a tarefa ex-kube-templates novamente e verifique se a tarefa ex-kube-templates é bem-sucedida.

Configuração de variáveis de secret

Um secret do Kubernetes é um objeto que contém dados confidenciais. É possível transmitir secrets para os pods do Kubernetes usando o KubernetesPodOperator. Os secrets precisam ser definidos no Kubernetes ou o pod não será iniciado.

Neste exemplo, mostramos duas maneiras de usar os Kubernetes Secrets: como uma variável de ambiente e como um volume ativado pelo pod.

O primeiro secret, airflow-secrets, é definido como uma variável de ambiente do Kubernetes chamada SQL_CONN (em vez de uma variável de ambiente do Airflow ou do Cloud Composer).

O segundo secret, service-account, instala service-account.json, um arquivo com um token de conta de serviço, em /var/secrets/google.

Veja a aparência dos secrets:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

O nome do primeiro secret do Kubernetes é definido na variável secret. Esse secret privado é chamado de airflow-secrets. Ele é exposto como uma variável de ambiente, conforme ditado por deploy_type. A variável de ambiente definida para, deploy_target, é SQL_CONN. Por fim, o key do secret armazenado no deploy_target é sql_alchemy_conn.

O nome do segundo secret do Kubernetes é definido na variável secret. Esse secret privado é chamado de service-account. Ele é exposto como um volume, conforme determinado pelo deploy_type. O caminho do arquivo a ser ativado, deploy_target, é /var/secrets/google. Por fim, o key do secret armazenado no deploy_target é service-account.json.

A configuração do operador vai ficar assim:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Sem fazer alterações no DAG ou no ambiente, a tarefa ex-kube-secrets falha. Se você analisar os registros, a tarefa falhará devido a um erro Pod took too long to start. Esse erro ocorre porque o Airflow não encontrou o secret especificado na configuração, secret_env.

gcloud

Para definir o secret usando gcloud:

  1. Receba informações sobre o cluster de ambiente do Cloud Composer.

    1. Execute este comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Substitua:

      • ENVIRONMENT pelo nome do ambiente.
      • LOCATION é a região em que o ambiente do Cloud Composer está localizado.

      A saída desse comando usa o seguinte formato: projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>.

    2. Para receber o ID do cluster do GKE, copie a saída depois de /clusters/ (termina em -gke).

  2. Conecte-se ao cluster do GKE executando este comando:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    Substitua:

    • CLUSTER_ID pelo ID do cluster do GKE.
    • PROJECT pelo ID do seu projeto do Google Cloud.
    • LOCATION é a região em que o ambiente do Cloud Composer está localizado.

  3. Crie secrets do Kubernetes.

    1. Execute o seguinte comando para criar um secret do Kubernetes que defina o valor de sql_alchemy_conn para test_value:

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    2. Crie um secret do Kubernetes que defina o valor de service-account.json como um caminho local de um arquivo de chave da conta de serviço chamado key.json executando o seguinte comando:

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

  4. Depois de definir os secrets, execute a tarefa ex-kube-secrets novamente na IU do Airflow.

  5. Verifique se a tarefa ex-kube-secrets foi bem-sucedida.

Configuração completa

Este exemplo mostra todas as variáveis que você pode configurar em KubernetesPodOperator. Não é necessário modificar o código para que a tarefa ex-all-configs seja bem-sucedida.

Para ver detalhes sobre cada variável, consulte a referência KubernetesPodOperator do Airflow.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Informações sobre o provedor de Kubernetes CNCF

GKEStartPodOperator e KubernetesPodOperator são implementados no provedor apache-airflow-providers-cncf-kubernetes.

Para ler as notas de versão com falha do provedor do Kubernetes CNCF, consulte o site do provedor do Kubernetes da CNCF.

Versão 6.0.0

Na versão 6.0.0 do pacote CNCF Kubernetes Provider, a conexão kubernetes_default é usada por padrão no KubernetesPodOperator.

Se você especificou uma conexão personalizada na versão 5.0.0, essa conexão personalizada ainda será usada pelo operador. Para voltar a usar a conexão kubernetes_default, faça o ajuste correto dos DAGs.

Versão 5.0.0

Esta versão apresenta algumas alterações incompatíveis com versões anteriores em comparação com a versão 4.4.0. Os mais importantes que você precisa saber estão relacionados à conexão kubernetes_default, que não é usada na versão 5.0.0.

  • A conexão kubernetes_default precisa ser modificada. O caminho de configuração do Kube precisa ser definido como /home/airflow/composer_kube_config (veja a Figura 1) ou, como alternativa, config_file precisa ser adicionado à configuração KubernetesPodOperator (como foi apresentado abaixo).
Campo de caminho de configuração do Kube na interface do Airflow
Figura 1. Interface do Airflow, modificando a conexão kubernetes_default (clique para ampliar)
  • Modifique o código de uma tarefa com o KubernetesPodOperator da seguinte maneira
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Para mais informações sobre a versão 5.0.0, consulte as Notas de lançamento do provedor Kubernetes da CNCF.

Solução de problemas

Dicas para solucionar problemas de falhas no pod

Além de verificar os registros de tarefas na IU do Airflow, verifique também os seguintes registros:

  • Saída do programador e dos workers do Airflow:

    1. No console do Google Cloud, acesse a página Ambientes.

      Acessar "Ambientes"

    2. Acesse o link DAGs para seu ambiente.

    3. No bucket do ambiente, suba um nível.

    4. Revise os registros na pasta logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Registros detalhados do pod no console do Google Cloud em "Cargas de trabalho do GKE". Esses registros incluem o arquivo YAML de definição, os eventos e os detalhes do pod.

Códigos de retorno diferente de zero quando também usar GKEStartPodOperator

Ao usar KubernetesPodOperator e GKEStartPodOperator, o código de retorno do ponto de entrada do contêiner determina se a tarefa é considerada bem-sucedida ou não. Os códigos de retorno diferentes de zero indicam a falha.

Um padrão comum ao usar KubernetesPodOperator e GKEStartPodOperator é executar um script de shell como o ponto de entrada do contêiner para agrupar várias operações dentro do contêiner.

Se você estiver escrevendo esse script, recomendamos que inclua o comando set -e na parte superior para que comandos com falha encerrem o script e propaguem a falha para a instância de tarefa do Airflow.

Tempos limite do pod

O tempo limite padrão para KubernetesPodOperator é de 120 segundos, o que pode resultar na ocorrência de tempos limite antes do download de imagens maiores. É possível aumentar o tempo limite alterando o parâmetro startup_timeout_seconds, quando você cria o KubernetesPodOperator.

Quando um pod expira, o registro específico da tarefa fica disponível na IU do Airflow. Exemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Os tempos limite do pod também podem ocorrer quando a conta de serviço do Composer não tiver as permissões necessárias do IAM para executar a tarefa em questão. Para verificar isso, veja os erros no nível do pod usando os painéis do GKE para analisar os registros para uma carga de trabalho específica, ou use o Cloud Logging.

Falha ao estabelecer uma nova conexão

O upgrade automático é ativado por padrão nos clusters do GKE. Se um pool de nós estiver em um cluster que está fazendo um upgrade, você poderá receber este erro:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Para verificar se o cluster está sendo atualizado, no console do Google Cloud, acesse a página Clusters do Kubernetes e procure o ícone de carregamento ao lado do nome do cluster do ambiente.

A seguir