Use o KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Esta página descreve como usar o KubernetesPodOperator para implantar pods do Kubernetes do Cloud Composer no cluster do Google Kubernetes Engine que faz parte do seu ambiente do Cloud Composer.

O KubernetesPodOperator inicia pods do Kubernetes no cluster do ambiente. Em comparação, Os operadores do Google Kubernetes Engine executam pods do Kubernetes em uma que pode ser um cluster separado, não relacionado ao seu de nuvem. Também é possível criar e excluir clusters usando operadores do Google Kubernetes Engine.

O KubernetesPodOperator é uma boa opção se você precisar de:

  • Dependências personalizadas do Python que não estão disponíveis no repositório PyPI público.
  • Dependências binárias que não estão disponíveis na imagem do worker do Cloud Composer.

Antes de começar

Configurar os recursos do ambiente do Cloud Composer

Ao criar um ambiente do Cloud Composer, você especifica os parâmetros de desempenho dele, incluindo os parâmetros de desempenho para o cluster do ambiente. Iniciar pods do Kubernetes no cluster de ambiente pode causar competição por recursos do cluster, como CPU ou memória. Como o programador e os workers do Airflow estão no mesmo cluster do GKE, eles não funcionarão corretamente, se a competição resultar na falta de recursos.

Para evitar a privação de recursos, realize uma ou mais das seguintes ações:

Crie um pool de nós.

A melhor forma de evitar a privação de recursos em do ambiente do Cloud Composer é criar um pool de nós e configurar pods do Kubernetes para serem executados usando apenas recursos desse pool.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Clique no nome do seu ambiente.

  3. Na página Detalhes do ambiente, acesse a guia Configuração do ambiente.

  4. Em Recursos > Cluster do GKE siga o link ver detalhes do cluster.

  5. Crie um pool de nós conforme descrito em Como adicionar um pool de nós.

gcloud

  1. Determine o nome do cluster de ambiente:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Substitua:

    • ENVIRONMENT_NAME pelo nome do ambiente
    • LOCATION pela região em que o ambiente está localizado;
  2. A saída contém o nome do cluster do ambiente. Por exemplo, pode ser europe-west3-example-enviro-af810e25-gke.

  3. Crie um pool de nós conforme descrito em Como adicionar um pool de nós.

Aumentar o número de nós no ambiente

O aumento do número de nós no ambiente do Cloud Composer aumenta a capacidade de computação disponível para as cargas de trabalho. Esse aumento não fornece outros recursos para tarefas que exigem mais CPU ou RAM do que o tipo de máquina especificado fornece.

Para aumentar a contagem de nós, atualize o ambiente.

Especificar o tipo de máquina apropriado

Durante a criação do ambiente do Cloud Composer, é possível especificar um tipo de máquina. Para garantir que hajam recursos disponíveis, especifique um tipo de máquina para o tipo de computação que ocorrer no ambiente do Cloud Composer.

Configuração mínima

Para criar um KubernetesPodOperator, somente o name e o image do pod são usados. Os parâmetros task_id são obrigatórios. O /home/airflow/composer_kube_config contém credenciais para autenticação no GKE.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Configuração de afinidade de pods

Ao configurar o parâmetro affinity no KubernetesPodOperator, você controlar em quais nós programar pods, como nós apenas em um determinado no pool de nós. Neste exemplo, o operador é executado somente nos pools de nós chamados pool-0 e pool-1. Os nós do ambiente do Cloud Composer 1 estão no default-pool. Portanto, os pods não são executados nos nós do ambiente.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Como o exemplo está configurado, a tarefa falha. Se você analisar os registros, a tarefa vai falhar porque os pools de nós pool-0 e pool-1 não existem.

Para garantir que os pools de nós em values existam, faça uma destas alterações de configuração:

  • Caso um pool de nós tenha sido criado anteriormente, substitua pool-0 e pool-1 pelos nomes dos pools de nós e faça o upload do DAG novamente.

  • Crie um pool de nós chamado pool-0 ou pool-1. É possível criar ambos, mas a tarefa precisa de apenas um para ser bem-sucedida.

  • Substitua pool-0 e pool-1 por default-pool, que é o pool padrão usado pelo Airflow. Em seguida, faça o upload do DAG novamente.

Depois de fazer as alterações, aguarde alguns minutos para que o ambiente seja atualizado. Em seguida, execute a tarefa ex-pod-affinity novamente e verifique se a tarefa ex-pod-affinity foi bem-sucedida.

Configurações avançadas

Este exemplo mostra outros parâmetros que você pode configurar no KubernetesPodOperator.

Para mais informações sobre parâmetros, consulte a referência do Airflow para KubernetesPodOperator. Para informações sobre como usar secrets e ConfigMaps, consulte Usar Secrets e ConfigMaps do Kubernetes. Para informações sobre o uso de modelos Jinja com o KubernetesPodOperator, consulte Use modelos Jinja.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Usar modelos Jinja

O Airflow é compatível com modelos Jinja em DAGs.

É preciso declarar os parâmetros obrigatórios do Airflow (task_id, name e image) com o operador. Como mostra o exemplo abaixo, Você pode criar modelos para todos os outros parâmetros com Jinja, incluindo cmds, arguments, env_vars e config_file.

O parâmetro env_vars no exemplo é definido por uma Variável do Airflow chamada my_value. O DAG de exemplo pega o valor da variável de modelo vars no Airflow. O Airflow tem mais variáveis que fornecem acesso a diferentes tipos de informações. Por exemplo: use a variável de modelo conf para acessar os valores de Opções de configuração do Airflow. Para mais informações e a de variáveis disponíveis no Airflow, consulte Referência de modelos no Airflow na documentação do Google Cloud.

Sem alterar o DAG ou criar a variável env_vars, a tarefa ex-kube-templates no exemplo falha porque a variável não existe. Crie essa variável na interface do Airflow ou com a Google Cloud CLI:

IU do Airflow

  1. Acesse a IU do Airflow.

  2. Na barra de ferramentas, selecione Administrador > Variáveis.

  3. Na página Listar variáveis, clique em Adicionar um novo registro.

  4. Na página Adicionar variável, digite as seguintes informações:

    • Chave: my_value
    • Val: example_value
  5. Clique em Salvar.

Se o ambiente usa o Airflow 1, execute o seguinte comando:

  1. Acesse a IU do Airflow.

  2. Na barra de ferramentas, selecione Administrador > Variáveis.

  3. Na página Variáveis, clique na guia Criar.

  4. Na página Variável, digite as seguintes informações:

    • Chave: my_value
    • Val: example_value
  5. Clique em Save.

gcloud

Digite este comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Se o ambiente usa o Airflow 1, execute o seguinte comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Substitua:

  • ENVIRONMENT pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;

O exemplo a seguir demonstra como usar modelos Jinja com KubernetesPodOperator:

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Usar secrets e ConfigMaps do Kubernetes

Um secret do Kubernetes é um objeto que contém dados sensíveis. Um ConfigMap do Kubernetes é um objeto que contém dados não confidenciais em pares de chave-valor.

No Cloud Composer 2, é possível criar segredos e ConfigMaps usando a CLI, a API ou o Terraform do Google Cloud e acessá-los no KubernetesPodOperator.

Sobre os arquivos de configuração YAML

Quando você cria um secret ou ConfigMap do Kubernetes usando a Google Cloud CLI e API, você fornece um arquivo no formato YAML. Este arquivo deve seguir a mesma conforme usado pelos Secrets e ConfigMaps do Kubernetes. A documentação do Kubernetes oferece muitos exemplos de código de ConfigMaps e Secrets. Para começar, você pode consulte Distribuir credenciais com segurança usando secrets e ConfigMaps.

Igual ao Secrets do Kubernetes, use a base64 representação ao definir valores em Secrets.

Para codificar um valor, use o comando a seguir, que é uma das muitas maneiras de conseguir um valor codificado em base64:

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Saída:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Os dois exemplos de arquivo YAML a seguir são usados em exemplos mais adiante neste guia. Exemplo de arquivo de configuração YAML para um secret do Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Outro exemplo que demonstra como incluir arquivos. As mesmas informações exemplo, codifique o conteúdo de um arquivo (cat ./key.json | base64) e depois insira este valor no arquivo YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Um exemplo de arquivo de configuração YAML para um ConfigMap. Não é necessário usar a representação base64 em ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Gerenciar secrets do Kubernetes

No Cloud Composer 2, você cria segredos usando a CLI do Google Cloud e kubectl:

  1. Receba informações sobre o cluster do ambiente:

    1. Execute este comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Substitua:

      • ENVIRONMENT pelo nome do ambiente.
      • LOCATION é a região em que o ambiente do Cloud Composer está localizado.

      A saída desse comando usa o seguinte formato: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Para receber o ID do cluster do GKE, copie a saída depois de /clusters/ (termina em -gke).

    3. Para ver a zona, copie a saída após /zones/.

  2. Conecte-se ao cluster do GKE com o seguinte comando:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Substitua:

    • CLUSTER_ID: o ID do cluster do ambiente.
    • PROJECT_ID: o ID do projeto.
    • ZONE pela zona em que o cluster do ambiente está localizado.
  3. Crie secrets do Kubernetes:

    Os comandos a seguir demonstram duas abordagens diferentes para criar secrets do Kubernetes. A abordagem --from-literal usa pares de chave-valor. A abordagem --from-file usa conteúdos de arquivos.

    • Para criar um secret do Kubernetes fornecendo pares de chave-valor, execute o comando a seguir. Este exemplo cria um secret chamado airflow-secrets que tem um campo sql_alchemy_conn com o valor de test_value.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • Para criar uma chave secreta do Kubernetes fornecendo o conteúdo do arquivo, execute o comando a seguir. Este exemplo cria um Secret chamado service-account que tem o campo service-account.json com o valor extraído do conteúdo de um arquivo ./key.json local.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

Usar os secrets do Kubernetes nos DAGs

Este exemplo mostra duas maneiras de usar os Secrets do Kubernetes: como uma variável de ambiente e como um volume montado pelo pod.

O primeiro Secret, airflow-secrets, foi definido a uma variável de ambiente do Kubernetes chamada SQL_CONN (em oposição a uma variável de ambiente do Airflow ou do Cloud Composer).

O segundo secret, service-account, instala service-account.json, um arquivo com um token de conta de serviço, em /var/secrets/google.

Confira como são os objetos Secret:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

O nome do primeiro secret do Kubernetes é definido na variável secret_env. Esse secret é chamado de airflow-secrets. O parâmetro deploy_type especifica que ele precisa ser exposto como uma variável de ambiente. O nome da variável de ambiente é SQL_CONN, conforme especificado no parâmetro deploy_target. Por fim, o valor da variável de ambiente SQL_CONN é definido como o valor da chave sql_alchemy_conn.

O nome do segundo secret do Kubernetes é definido na variável secret_volume. Esse secret é chamado de service-account. Ele é exposto como um volume, conforme especificado no parâmetro deploy_type. O caminho do arquivo a ser ativado, deploy_target, é /var/secrets/google. Por fim, o key do O secret armazenado em deploy_target é service-account.json.

Veja como é a configuração do operador:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Informações sobre o provedor do Kubernetes da CNCF

KubernetesPodOperator é implementado apache-airflow-providers-cncf-kubernetes provedor.

Para ver as notas de lançamento detalhadas do provedor do Kubernetes do CNCF, acesse o site do provedor do Kubernetes do CNCF (em inglês).

Versão 6.0.0

Na versão 6.0.0 do pacote do provedor do Kubernetes do CNCF, a conexão kubernetes_default é usada por padrão no KubernetesPodOperator.

Se você especificou uma conexão personalizada na versão 5.0.0, essa conexão personalizada ainda é usada pelo operador. Para voltar a usar a conexão kubernetes_default, ajuste seus DAGs de acordo com isso.

Versão 5.0.0

Esta versão apresenta algumas mudanças incompatíveis com versões anteriores em comparação com a versão 4.4.0. As mais importantes estão relacionadas à conexão kubernetes_default, que não é usada na versão 5.0.0.

  • A conexão kubernetes_default precisa ser modificada. Configuração do Kubernetes o caminho precisa ser definido como /home/airflow/composer_kube_config conforme mostrado na figura a seguir. Como alternativa, config_file precisa ser adicionado à configuração do KubernetesPodOperator, conforme mostrado no exemplo de código abaixo.
Campo de caminho de configuração do Kube na interface do Airflow
Figura 1. Interface do Airflow, modificando a conexão kubernetes_default (clique para ampliar)
  • Modifique o código de uma tarefa usando o KubernetesPodOperator da seguinte maneira:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Para mais informações sobre a versão 5.0.0, consulte as Notas da versão do provedor do Kubernetes do CNCF.

Solução de problemas

Nesta seção, fornecemos orientações para solucionar problemas comuns do KubernetesPodOperator problemas:

Ver registros

Ao solucionar problemas, verifique os registros na seguinte ordem:

  1. Registros de tarefas do Airflow:

    1. No console do Google Cloud, acesse a página Ambientes.

      Acessar "Ambientes"

    2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

    3. Acesse a guia DAGs.

    4. Clique no nome do DAG e, em seguida, na execução do DAG para conferir os detalhes e os registros.

  2. Registros do programador do Airflow:

    1. Acesse a página Detalhes do ambiente.

    2. Acesse a guia Registros.

    3. Inspecione os registros do agendador do Airflow.

  3. Registros do pod no console do Google Cloud, em GKE do Google Cloud. Esses registros incluem o arquivo YAML de definição do pod, os eventos do pod e Detalhes do pod.

Códigos de retorno diferentes de zero

Ao usar o KubernetesPodOperator (e o GKEStartPodOperator), o código de retorno do ponto de entrada do contêiner determina se a tarefa considerada bem-sucedida ou não. Os códigos de retorno diferentes de zero indicam a falha.

Um padrão comum é executar um script de shell como o ponto de entrada do contêiner para agrupam várias operações dentro do contêiner.

Se você estiver escrevendo esse script, recomendamos que inclua os set -e na parte de cima do script para que os comandos com falha no script encerrar o script e propagar a falha para a instância de tarefa do Airflow.

Tempos limite do pod

O tempo limite padrão do KubernetesPodOperator é de 120 segundos, podem resultar em tempos limite atingidos antes do download de imagens maiores. Você pode aumente o tempo limite mudando o parâmetro startup_timeout_seconds quando você cria o KubernetesPodOperator.

Quando um pod expira, o registro específico da tarefa fica disponível a interface do Airflow. Exemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Os tempos limite do pod também podem ocorrer Conta de serviço do Cloud Composer não tem as permissões de IAM necessárias para executar a tarefa em mão. Para verificar isso, analise os erros no nível do pod usando os painéis do GKE para analisar os registros de uma carga de trabalho específica ou use o Cloud Logging.

Falha ao estabelecer uma nova conexão

O upgrade automático é ativado por padrão nos clusters do GKE. Se um pool de nós estiver em um cluster que está fazendo um upgrade, você poderá receber este erro:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Para verificar se o cluster está sendo atualizado, acesse o console do Google Cloud Clusters do Kubernetes e procure o ícone de carregamento ao lado do cluster do seu ambiente de execução.

A seguir