Como usar o KubernetesPodOperator

Esta página mostra como usar o KubernetesPodOperator para lançar pods do Kubernetes do Cloud Composer no cluster do Google Kubernetes Engine, que faz parte do ambiente do Cloud Composer, e garantir que o ambiente tenha os recursos apropriados.

Recursos do ambiente do Cloud Composer no projeto de locatário e de cliente com a seta mostrando que os pods lançados estarão no mesmo Kubernetes Engine que os workers do Airflow, Redis, programador do Airflow e Proxy do Cloud SQL
Local de lançamento do Pod do Kubernetes do Cloud Composer (clique para ampliar)

O KubernetesPodOperator é uma boa opção, se você precisar de:

  • Dependências personalizadas do Python que não estão disponíveis no repositório PyPI público.
  • Dependências binárias que não estão disponíveis na imagem de worker no estoque do Cloud Composer.

Esta página apresenta um exemplo de DAG que inclui estas configurações KubernetesPodOperator:

Antes de começar

Como garantir os recursos adequados para o ambiente

Ao criar um ambiente do Cloud Composer, você especifica a quantidade de energia de computação para o ambiente. Uma determinada quantidade é alocada para o cluster do GKE. Iniciar os pods do Kubernetes no ambiente pode causar uma competição entre programas por recursos, como a CPU ou a memória. Como o programador e os workers do Airflow estão no mesmo cluster do GKE, eles não funcionarão corretamente se a competição resultar na falta de recursos.

Para evitar a privação de recursos, realize uma ou mais das seguintes ações:

Criação do pool de nós

A maneira recomendável de evitar a privação de recursos no ambiente do Cloud Composer é criar um novo pool de nós e configurar os pods do Kubernetes para que sejam executados usando somente os recursos desse pool.

Para criar um pool de nós em um cluster, execute estas etapas:

Console

  1. No Console do Cloud, acesse o menu do GKE.

    Acessar o menu do GKE

  2. Selecione o cluster desejado.

  3. Clique em Editar.

  4. Em Pools de nós, clique em Adicionar pool de nós.

  5. Configure o pool de nós.

  6. (Opcional) Ative as opções avançadas, como upgrades e escalonamento automáticos.

  7. Clique em Save.

gcloud

Digite o seguinte comando:

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \
    --zone ZONE \
    ADDITIONAL_FLAGS 

em que:

  • POOL_NAME é o nome solicitado do pool de nós;
  • CLUSTER é o nome do cluster em que o pool de nós é criado;
  • PROJECT_ID é o nome do projeto do Cloud Composer;
  • ZONE é a zona em que o cluster do GKE está localizado.

    Para ver a lista de opções, consulte a documentação gcloud container node-pools create.

    Uma solicitação node-pools create bem-sucedida retorna as informações do pool de nós:

    Creating node pool example-pool...done.
    Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool].
    NAME          MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
    example-pool  n1-standard-1  100           1.2.4

Como aumentar o número de nós no ambiente

O aumento do número de nós no ambiente do Cloud Composer aumenta a capacidade de computação disponível para os workers. Esse aumento não fornece outros recursos para tarefas que exigem mais CPU ou RAM do que o tipo de máquina especificado fornece.

Para aumentar a contagem de nós, atualize o ambiente.

Como especificar o tipo de máquina apropriado

Durante a criação do ambiente do Cloud Composer, é possível especificar um tipo de máquina. Para garantir que hajam recursos disponíveis, especifique o tipo de máquina ideal para o tipo de computação que ocorrerá no ambiente do Cloud Composer.

Configuração do KubernetesPodOperator

Para acompanhar este exemplo, coloque todo o arquivo kubernetes_pod_operator.py na pasta "dags/" do ambiente ou adicione o código KubernetesPodOperator relevante a um DAG.

As seções a seguir explicam cada configuração KubernetesPodOperator no exemplo. Para mais informações sobre cada variável de configuração, consulte a referência do Airflow.

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/code.html#default-variables

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            'EXAMPLE_VAR': '/example/value',
            'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={'limit_memory': 1, 'limit_cpu': 1},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={})

Configuração mínima

Para criar um KubernetesPodOperator, somente name, namespace, image e task_id são obrigatórios.

Quando você colocar o snippet de código a seguir em um DAG, a configuração usará os padrões em /home/airflow/composer_kube_config. Não é necessário modificar o código para que a tarefa pod-ex-minimum seja bem-sucedida.

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4')

Configuração do modelo

O Airflow é compatível com o modelo Jinja. Você precisa declarar as variáveis necessárias (task_id, name, namespace e image) com o operador. Como mostra o exemplo a seguir, é possível criar modelos de todos os outros parâmetros com o Jinja, incluindo cmds, arguments, env_vars e config_file.

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/code.html#default-variables

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

Sem alterar o DAG ou ambiente, a tarefa ex-kube-templates falha devido a dois erros. Vejamos como depurar a tarefa e resolver os erros.

  1. Verifique se a tarefa ex-kube-templates falhou.
  2. Verifique os registros de tarefas ex-kube-templates.

    Os registros mostram que a tarefa está falhando porque a variável apropriada não existe (my_value).

  3. Para definir my_value com gcloud ou a IU do Airflow:

    gcloud

    Digite o seguinte comando:

    gcloud composer environments run ENVIRONMENT \
        --location LOCATION \
        variables -- \
        --set my_value example_value 

    em que:

    • ENVIRONMENT é o nome do ambiente do Cloud Composer;
    • LOCATION é a região em que o ambiente do Cloud Composer está localizado.

    IU do Airflow

    1. Na barra de ferramentas, clique em Administrador > Variáveis.
    2. Clique em Criar.
    3. Digite as informações a seguir:
      • Chave: my_value
      • Val: example_value
    4. Clique em Save.
  4. Execute a tarefa ex-kube-templates novamente.

  5. Verifique o status da tarefa ex-kube-templates.

    A tarefa ex-kube-templates ainda está falhando. Ao olhar os registros, você verá que a tarefa falha porque core/kubeconfig não foi encontrado em config. Para se referir a um config_file personalizado (um arquivo de configuração do Kubernetes), é necessário definir a variável kube_config no arquivo airflow.cfg como uma configuração válida do Kubernetes.

  6. Para definir a variável kube_config, digite o seguinte comando:

    gcloud composer environments update ENVIRONMENT \
        --location LOCATION \
        --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config

    em que:

    • ENVIRONMENT é o nome do ambiente do Cloud Composer;
    • LOCATION é a região em que o ambiente do Cloud Composer está localizado.
  7. Aguarde alguns minutos para que o ambiente seja atualizado.

  8. Execute a tarefa ex-kube-templates novamente

  9. Verifique se a tarefa ex-kube-templates foi bem-sucedida.

Configuração de variáveis secrets

Um secret do Kubernetes é um objeto que contém uma pequena quantidade de dados confidenciais. É possível transmitir secret para os pods do Kubernetes usando o KubernetesPodOperator. Os secrets precisam ser definidos no Kubernetes ou o pod não será iniciado.

Neste exemplo, implantamos o secret do Kubernetes, airflow-secrets, em uma variável de ambiente do Kubernetes chamada SQL_CONN (ao contrário de uma variável de ambiente do Airflow ou Cloud Composer).

Veja como é o secret:

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

O nome do secret do Kubernetes é definido na variável secret. Esse secret privado é chamado de airflow-secrets. Ele é exposto como uma variável de ambiente, conforme ditado por deploy_type. A variável de ambiente definida para ele, deploy_target, é SQL_CONN. Por fim, o key do secret que armazenamos em deploy_target é sql_alchemy_conn.

Veja como é a configuração do operador:

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        'EXAMPLE_VAR': '/example/value',
        'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})

Sem fazer alterações no DAG ou no ambiente, a tarefa ex-kube-secrets falha. Vejamos como depurar a tarefa e resolver os erros.

  1. Verifique se a tarefa ex-kube-secrets falhou.
  2. Verifique os registros de tarefas ex-kube-secrets.

    Ao olhar os registros, você verá que a tarefa falha devido a um erro Pod took too long to start. Esse erro ocorre porque o Airflow não encontrou o secret especificado na configuração, secret_env.

  3. Para definir o secret usando gcloud:

    1. Veja os detalhes do ambiente do Cloud Composer executando este comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"

      onde:

      • ENVIRONMENT é o nome do ambiente do Cloud Composer;
      • LOCATION é a região em que o ambiente do Cloud Composer está localizado.

        Saída semelhante a estas devoluções: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>

    2. Para ver o ID do cluster do GKE, copie e cole a saída depois de /clusters/ (termina em -gke) em algum lugar onde você possa acessá-la mais tarde. Essa saída é o ID cluster.

    3. Para ver a zona, copie e cole a saída depois de /zones/ em algum lugar onde você possa acessá-la mais tarde.

    4. Conecte-se ao cluster do GKE executando este comando:

      gcloud container clusters get-credentials CLUSTER_ID \
      --zone ZONE \
      --project PROJECT
      

      em que:

      • CLUSTER_ID é o ID do cluster do GKE;
      • ZONE é a zona em que o GKE está localizado;
      • PROJECT é o ID do projeto do Google Cloud.
    5. Execute o seguinte comando para criar um secret do Kubernetes que defina o valor de sql_alchemy_conn para test_value:

      kubectl create secret generic airflow-secrets \
      --from-literal sql_alchemy_conn=test_value
      
  4. Depois de definir o secret, execute a tarefa ex-kube-secrets novamente.

  5. Verifique se a tarefa ex-kube-secrets foi bem-sucedida.

Configuração de afinidade do Pod

Ao configurar o parâmetro affinity em KubernetesPodOperator, você controla em quais nós os pods serão programados, como, por exemplo, nós apenas em um determinado pool de nós. Neste exemplo, o operador é executado somente nos pools de nós chamados pool-0 e pool-1.

Os recursos do ambiente do Cloud Composer no projeto de locatário e de cliente com a seta mostrando que os pods lançados estarão no mesmo Kubernetes Engine que os workers do Airflow, Redis, programador do Airflow e Proxy do Cloud SQL. Porém, em pools específicos, pool-0 e pool-1, são mostrados como caixas separadas no Kubernetes Engine.
Local de lançamento do pod do Kubernetes do Cloud Composer com afinidade de pods (clique para ampliar)


kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

Como o exemplo está configurado no momento, a tarefa falha. Vejamos como depurar a tarefa e resolver os erros.

  1. Verifique se a tarefa ex-pod-affinity falhou.
  2. Verifique os registros de tarefas ex-pod-affinity.

    Ao olhar os registros, você verá que a tarefa falha porque os pools de nós pool-0 e pool-1 não existem.

  3. Para garantir que os pools de nós em values existam, faça uma destas alterações de configuração:

    • Caso um pool de nós tenha sido criado anteriormente, substitua pool-0 e pool-1 pelos nomes dos pools de nós e faça o upload do DAG novamente.
    • Crie um pool de nós chamado pool-0 ou pool-1. É possível criar ambos, mas a tarefa precisa de apenas um para ser bem-sucedida.
    • Substitua pool-0 e pool-1 por default-pool, que é o pool padrão usado pelo Airflow. Em seguida, faça o upload do DAG novamente. Observação: por padrão, os pods do Kubernetes são programados em default-pool. Caso pools sejam adicionados mais tarde, os pools serão restritos a default-pool.
  4. Aguarde alguns minutos para que o ambiente seja atualizado.

  5. Execute a tarefa ex-pod-affinity novamente.

  6. Verifique se a tarefa ex-pod-affinity foi bem-sucedida.

Configuração completa

Este exemplo mostra todas as variáveis que você pode configurar em KubernetesPodOperator. Não é necessário modificar o código para que a tarefa ex-all-configs seja bem-sucedida.

Para ver detalhes sobre cada variável, consulte a referência KubernetesPodOperator do Airflow.

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={'limit_memory': 1, 'limit_cpu': 1},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={})

Como gerenciar DAGs

Como visualizar o status de uma tarefa

  1. Acesse a interface da Web do Airflow.
  2. Na página DAGs, clique no nome do DAG, como, por exemplo, composer_sample_kubernetes_pod.
  3. Na página de detalhes dos DAGs, clique em Visualizar gráfico.
  4. Verificar status:

    • Falhou: a tarefa tem uma caixa vermelha ao redor dela (como ex-kube-templates). Também é possível manter o ponteiro sobre a tarefa e procurar por Estado: falhou.

    • Bem-sucedida: a tarefa tem uma caixa verde ao redor dela (como pod-ex-minimum). Também é possível manter o ponteiro sobre a tarefa e procurar por Estado: bem-sucedida.

Como verificar os registros das tarefas

  1. Na IU do Airflow, veja o status da tarefa.
  2. Na visualização do gráfico do DAG, clique no nome da tarefa.
  3. No menu de contexto da instância da tarefa, clique em Exibir registro.

Como executar uma tarefa novamente

  1. Para retornar ao DAG:
    1. Na barra de ferramentas da IU do Airflow, clique em DAGs.
    2. Clique no nome do DAG, como composer_samples_kubernetes_pod.
  2. Para executar a tarefa novamente:
    1. Clique no nome da tarefa.
    2. Clique em Limpar e em OK. A tarefa é executada novamente automaticamente.

Resolver problemas

Dicas para solucionar problemas de falhas no pod

Além de verificar os registros de tarefas, verifique também estes registros:

  • Saída do programador e dos workers do Airflow:

    1. Acesse o bucket do Cloud Storage do ambiente do Cloud Composer. Este é o bucket em que os DAGs estão localizados.
    2. Analise os registros em logs/DAG_NAME/TASK_ID/EXECUTION_DATE.
  • Registros detalhados do pod no Console do Cloud em cargas de trabalho do GKE. Esses registros incluem o arquivo YAML de definição, os eventos e os detalhes do pod.

Códigos de retorno diferente de zero quando também usar GKEPodOperator

Ao usar KubernetesPodOperator e GKEPodOperator, o código de retorno do ponto de entrada do contêiner determina se a tarefa é considerada bem-sucedida ou não. Os códigos de retorno diferentes de zero indicam a falha.

Um padrão comum ao usar KubernetesPodOperator e GKEPodOperator é executar um script de shell como o ponto de entrada do contêiner para agrupar várias operações dentro do contêiner.

Se você estiver escrevendo esse script, recomendamos que inclua o comando set -e na parte superior para que comandos com falha encerrem o script e propaguem a falha para a instância de tarefa do Airflow.

A tarefa falha mesmo se o pod for bem-sucedido

Para ambientes do Cloud Composer que executam composer-1.4.1-airflow-* ou uma versão anterior:

Se uma tarefa do Airflow for executada por uma hora e os registros de tarefas terminarem com kubernetes.client.rest.ApiException: (401) e Reason: Unauthorized, o job do Kubernetes subjacente continuará sendo executado depois desse ponto e poderá até ser bem-sucedido. No entanto, o Airflow informa que a tarefa falhou.

Para corrigir esse problema, adicione uma dependência do pacote PyPI explícita em kubernetes>=8.0.1.

Tempos limite do pod

O tempo limite padrão para KubernetesPodOperator é de 120 segundos, o que pode resultar na ocorrência de tempos limite antes do download de imagens maiores. É possível aumentar o tempo limite alterando o parâmetro startup_timeout_seconds, quando você cria o KubernetesPodOperator.

Quando um pod expirar, o registro específico da tarefa estará disponível na IU da Web do Airflow. Exemplo:

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Os tempos limite do pod também podem ocorrer quando a conta de serviço do Composer não tiver as permissões necessárias do IAM para executar a tarefa em questão. Para verificar isso, veja os erros no nível do pod usando os painéis do GKE para analisar os registros de uma carga de trabalho específica, ou use o Stackdriver Logging.

Falha ao estabelecer uma nova conexão

O upgrade automático é ativado por padrão nos clusters do GKE. Se um pool de nós estiver em um cluster que está fazendo um upgrade, você poderá receber este erro:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse

Para verificar se o cluster está fazendo um upgrade, acesse o menu do GKE no Console do Cloud e procure o ícone de carregamento ao lado do nome do cluster do ambiente.

Recursos relacionados