Use o KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página descreve como usar o KubernetesPodOperator para implementar pods do Kubernetes do Cloud Composer no cluster do Google Kubernetes Engine que faz parte do seu ambiente do Cloud Composer.

O KubernetesPodOperator inicia pods do Kubernetes no cluster do seu ambiente. Em comparação, os operadores do Google Kubernetes Engine executam pods do Kubernetes num cluster especificado, que pode ser um cluster separado não relacionado com o seu ambiente. Também pode criar e eliminar clusters através dos operadores do Google Kubernetes Engine.

O KubernetesPodOperator é uma boa opção se precisar de:

  • Dependências personalizadas do Python que não estão disponíveis através do repositório público PyPI.
  • Dependências binárias que não estão disponíveis na imagem de trabalho do Cloud Composer padrão.

Antes de começar

Configure os recursos do ambiente do Cloud Composer

Quando cria um ambiente do Cloud Composer, especifica os respetivos parâmetros de desempenho, incluindo os parâmetros de desempenho do cluster do ambiente. O lançamento de pods do Kubernetes no cluster do ambiente pode causar concorrência pelos recursos do cluster, como a CPU ou a memória. Uma vez que o programador e os trabalhadores do Airflow estão no mesmo cluster do GKE, os programadores e os trabalhadores não funcionam corretamente se a concorrência resultar na falta de recursos.

Para evitar a escassez de recursos, tome uma ou mais das seguintes medidas:

Crie um node pool

A forma preferencial de evitar a escassez de recursos no ambiente do Cloud Composer é criar um novo conjunto de nós e configurar pods do Kubernetes para execução apenas com recursos desse conjunto.

Consola

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Clique no nome do seu ambiente.

  3. Na página Detalhes do ambiente, aceda ao separador Configuração do ambiente.

  4. Na secção Recursos > Cluster do GKE, siga o link ver detalhes do cluster.

  5. Crie um node pool conforme descrito em Adicionar um node pool.

gcloud

  1. Determine o nome do cluster do seu ambiente:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    Substituir:

    • ENVIRONMENT_NAME com o nome do ambiente.
    • LOCATION com a região onde o ambiente está localizado.
  2. O resultado contém o nome do cluster do seu ambiente. Por exemplo, pode ser europe-west3-example-enviro-af810e25-gke.

  3. Crie um node pool conforme descrito em Adicionar um node pool.

Aumente o número de nós no seu ambiente

Aumentar o número de nós no seu ambiente do Cloud Composer aumenta a capacidade de computação disponível para as suas cargas de trabalho. Este aumento não fornece recursos adicionais para tarefas que requerem mais CPU ou RAM do que o tipo de máquina especificado.

Para aumentar a contagem de nós, atualize o seu ambiente.

Especifique o tipo de máquina adequado

Durante a criação do ambiente do Cloud Composer, pode especificar um tipo de máquina. Para garantir os recursos disponíveis, especifique um tipo de máquina para o tipo de computação que ocorre no seu ambiente do Cloud Composer.

Configuração mínima

Para criar um KubernetesPodOperator, apenas são necessários os parâmetros name, image e task_id do Pod a usar. O /home/airflow/composer_kube_config contém credenciais para autenticar no GKE.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Fluxo de ar 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Configuração da afinidade de pods

Quando configura o parâmetro affinity no KubernetesPodOperator, controla em que nós agendar os pods, como nós apenas num node pool específico. Neste exemplo, o operador é executado apenas em conjuntos de nós denominados pool-0 e pool-1. Os nós do ambiente do Cloud Composer 1 estão no default-pool, pelo que os seus pods não são executados nos nós do seu ambiente.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Fluxo de ar 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Conforme o exemplo está configurado, a tarefa falha. Se analisar os registos, a tarefa falha porque os conjuntos de nós pool-0 e pool-1 não existem.

Para se certificar de que os agrupamentos de nós em values existem, faça qualquer uma das seguintes alterações de configuração:

  • Se criou um conjunto de nós anteriormente, substitua pool-0 e pool-1 pelos nomes dos seus conjuntos de nós e carregue novamente o DAG.

  • Crie um node pool com o nome pool-0 ou pool-1. Pode criar ambos, mas a tarefa só precisa de um para ser bem-sucedida.

  • Substitua pool-0 e pool-1 por default-pool, que é o conjunto predefinido que o Airflow usa. Em seguida, carregue novamente o DAG.

Depois de fazer as alterações, aguarde alguns minutos para que o ambiente seja atualizado. Em seguida, execute novamente a tarefa ex-pod-affinity e verifique se a tarefa ex-pod-affinity é bem-sucedida.

Configuração adicional

Este exemplo mostra parâmetros adicionais que pode configurar no KubernetesPodOperator.

Consulte os seguintes recursos para mais informações:

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Fluxo de ar 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Use modelos Jinja

O Airflow suporta modelos Jinja em DAGs.

Tem de declarar os parâmetros do Airflow necessários (task_id, name e image) com o operador. Conforme mostrado no exemplo seguinte, pode usar modelos para todos os outros parâmetros com o Jinja, incluindo cmds, arguments, env_vars e config_file.

O parâmetro env_vars no exemplo é definido a partir de uma variável do Airflow denominada my_value. O DAG de exemplo obtém o respetivo valor da variável de modelo vars no Airflow. O Airflow tem mais variáveis que dão acesso a diferentes tipos de informações. Por exemplo, pode usar a variável de modelo conf para aceder aos valores das opções de configuração do Airflow. Para mais informações e a lista de variáveis disponíveis no Airflow, consulte a referência de modelos na documentação do Airflow.

Sem alterar o DAG nem criar a variável env_vars, a tarefa ex-kube-templates no exemplo falha porque a variável não existe. Crie esta variável na IU do Airflow ou com a CLI Google Cloud:

IU do Airflow

  1. Aceda à IU do Airflow.

  2. Na barra de ferramentas, selecione Administração > Variáveis.

  3. Na página Variável de lista, clique em Adicionar um novo registo.

  4. Na página Adicionar variável, introduza as seguintes informações:

    • Tecla:my_value
    • Val: example_value
  5. Clique em Guardar.

Se o seu ambiente usar o Airflow 1, execute o seguinte comando:

  1. Aceda à IU do Airflow.

  2. Na barra de ferramentas, selecione Administração > Variáveis.

  3. Na página Variáveis, clique no separador Criar.

  4. Na página Variável, introduza as seguintes informações:

    • Tecla:my_value
    • Val: example_value
  5. Clique em Guardar.

gcloud

Introduza o seguinte comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Se o seu ambiente usar o Airflow 1, execute o seguinte comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

Substituir:

  • ENVIRONMENT com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.

O exemplo seguinte demonstra como usar modelos Jinja com KubernetesPodOperator:

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Fluxo de ar 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Use segredos e ConfigMaps do Kubernetes

Um segredo do Kubernetes é um objeto que contém dados confidenciais. Um ConfigMap do Kubernetes é um objeto que contém dados não confidenciais em pares de chave/valor.

No Cloud Composer 2, pode criar segredos e ConfigMaps através da CLI Google Cloud, da API ou do Terraform e, em seguida, aceder aos mesmos a partir do KubernetesPodOperator.

Acerca dos ficheiros de configuração YAML

Quando cria um segredo do Kubernetes ou um ConfigMap através da Google Cloud CLI e da API, fornece um ficheiro no formato YAML. Este ficheiro tem de seguir o mesmo formato usado pelos segredos e pelos mapas de configuração do Kubernetes. A documentação do Kubernetes fornece muitos exemplos de código de ConfigMaps e Secrets. Para começar, pode consultar a página Distribua credenciais de forma segura através de segredos e ConfigMaps.

Tal como nos segredos do Kubernetes, use a representação base64 quando definir valores nos segredos.

Para codificar um valor, pode usar o seguinte comando (esta é uma das muitas formas de obter um valor codificado em base64):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

Saída:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Os dois exemplos de ficheiros YAML seguintes são usados em exemplos mais adiante neste guia. Exemplo de ficheiro de configuração YAML para um secret do Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

Outro exemplo que demonstra como incluir ficheiros. Tal como no exemplo anterior, primeiro codifique o conteúdo de um ficheiro (cat ./key.json | base64) e, em seguida, faculte este valor no ficheiro YAML:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

Um exemplo de um ficheiro de configuração YAML para um ConfigMap. Não precisa de usar a representação base64 em ConfigMaps:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Faça a gestão dos segredos do Kubernetes

No Cloud Composer 2, cria segredos através da CLI do Google Cloud e kubectl:

  1. Obtenha informações sobre o cluster do seu ambiente:

    1. Execute o seguinte comando:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      Substituir:

      • ENVIRONMENT com o nome do seu ambiente.
      • LOCATION com a região onde o ambiente do Cloud Composer está localizado.

      O resultado deste comando usa o seguinte formato: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>.

    2. Para obter o ID do cluster do GKE, copie o resultado após /clusters/ (termina em -gke).

    3. Para obter a zona, copie o resultado depois de /zones/.

  2. Estabeleça ligação ao seu cluster do GKE com o seguinte comando:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    Substituir:

    • CLUSTER_ID: o ID do cluster do ambiente.
    • PROJECT_ID: o ID do projeto.
    • ZONE com a zona onde o cluster do ambiente está localizado.
  3. Crie segredos do Kubernetes:

    Os comandos seguintes demonstram duas abordagens diferentes para criar segredos do Kubernetes. A abordagem --from-literal usa pares de chave-valor. A abordagem --from-file usa o conteúdo dos ficheiros.

    • Para criar um Kubernetes Secret fornecendo pares de chave-valor, execute o seguinte comando. Este exemplo cria um segredo denominado airflow-secrets que tem um campo sql_alchemy_conn com o valor de test_value.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • Para criar um segredo do Kubernetes fornecendo o conteúdo do ficheiro, execute o comando seguinte. Este exemplo cria um segredo denominado service-account que tem o campo service-account.json com o valor retirado do conteúdo de um ficheiro ./key.json local.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

Use segredos do Kubernetes nos seus DAGs

Este exemplo mostra duas formas de usar segredos do Kubernetes: como uma variável de ambiente e como um volume montado pelo pod.

O primeiro secret, airflow-secrets, está definido como uma variável de ambiente do Kubernetes denominada SQL_CONN (em oposição a uma variável de ambiente do Airflow ou do Cloud Composer).

O segundo segredo, service-account, monta service-account.json, um ficheiro com um token de conta de serviço, em /var/secrets/google.

Veja o aspeto dos objetos secretos:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Fluxo de ar 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

O nome do primeiro segredo do Kubernetes é definido na variável secret_env. Este Secret tem o nome airflow-secrets. O parâmetro deploy_type especifica que tem de ser exposto como uma variável de ambiente. O nome da variável de ambiente é SQL_CONN, conforme especificado no parâmetro deploy_target. Por último, o valor da variável de ambiente SQL_CONN é definido para o valor da chave sql_alchemy_conn.

O nome do segundo secret do Kubernetes é definido na variável secret_volume. Este Secret tem o nome service-account. É exposto como um volume, conforme especificado no parâmetro deploy_type. O caminho do ficheiro a montar, deploy_target, é /var/secrets/google. Por fim, o key do segredo armazenado no deploy_target é service-account.json.

Veja o aspeto da configuração do operador:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Fluxo de ar 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Informações sobre o fornecedor do Kubernetes da CNCF

O KubernetesPodOperator é implementado no fornecedor apache-airflow-providers-cncf-kubernetes.

Para ver notas de lançamento detalhadas do fornecedor do CNCF Kubernetes, consulte o Website do fornecedor do CNCF Kubernetes.

Versão 6.0.0

Na versão 6.0.0 do pacote do fornecedor Kubernetes da CNCF, a ligação kubernetes_default é usada por predefinição no KubernetesPodOperator.

Se especificou uma ligação personalizada na versão 5.0.0, esta ligação personalizada continua a ser usada pelo operador. Para voltar a usar a ligação kubernetes_default, é recomendável ajustar os DAGs em conformidade.

Versão 5.0.0

Esta versão introduz algumas alterações incompatíveis com versões anteriores em comparação com a versão 4.4.0. As mais importantes estão relacionadas com a ligação kubernetes_default, que não é usada na versão 5.0.0.

  • A associação kubernetes_default tem de ser modificada. O caminho de configuração do Kubernetes tem de estar definido como /home/airflow/composer_kube_config (conforme mostrado na figura seguinte). Em alternativa, tem de adicionar config_file à configuração do KubernetesPodOperator (conforme mostrado no exemplo de código seguinte).
Campo do caminho do ficheiro de configuração do Kube na IU do Airflow
Figura 1. IU do Airflow, modificando a ligação kubernetes_default (clique para aumentar)
  • Modifique o código de uma tarefa com o KubernetesPodOperator da seguinte forma:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

Para mais informações acerca da versão 5.0.0, consulte as notas de lançamento do fornecedor do CNCF Kubernetes.

Resolução de problemas

Esta secção oferece conselhos para resolver problemas comuns do KubernetesPodOperator:

Ver registos

Ao resolver problemas, pode verificar os registos pela seguinte ordem:

  1. Registos de tarefas do Airflow:

    1. Na Google Cloud consola, aceda à página Ambientes.

      Aceder a Ambientes

    2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

    3. Aceda ao separador DAGs.

    4. Clique no nome do DAG e, de seguida, clique na execução do DAG para ver os detalhes e os registos.

  2. Registos do programador do Airflow:

    1. Aceda à página Detalhes do ambiente.

    2. Aceda ao separador Registos.

    3. Inspecione os registos do programador do Airflow.

  3. Registos de pods na Google Cloud consola, em cargas de trabalho do GKE. Estes registos incluem o ficheiro YAML de definição do pod, os eventos do pod e os detalhes do pod.

Códigos de retorno diferentes de zero

Quando usar o KubernetesPodOperator (e o GKEStartPodOperator), o código de retorno do ponto de entrada do contentor determina se a tarefa é considerada bem-sucedida ou não. Os códigos de retorno diferentes de zero indicam uma falha.

Um padrão comum é executar um script de shell como o ponto de entrada do contentor para agrupar várias operações no contentor.

Se estiver a escrever um script deste tipo, recomendamos que inclua o comando set -e na parte superior do script para que os comandos com falhas no script terminem o script e propaguem a falha para a instância da tarefa do Airflow.

Limites de tempo de agrupamentos

O limite de tempo predefinido para KubernetesPodOperator é de 120 segundos, o que pode resultar em limites de tempo que ocorrem antes da transferência de imagens maiores. Pode aumentar o tempo limite alterando o parâmetro startup_timeout_seconds quando cria o KubernetesPodOperator.

Quando um pod excede o limite de tempo, o registo específico da tarefa está disponível na IU do Airflow. Por exemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Os tempos limite dos pods também podem ocorrer quando a conta de serviço do Cloud Composer não tem as autorizações do IAM necessárias para realizar a tarefa em questão. Para verificar isto, consulte os erros ao nível do pod através dos painéis de controlo do GKE para ver os registos da sua carga de trabalho específica ou use o Cloud Logging.

Falha ao estabelecer uma nova ligação

A atualização automática está ativada por predefinição nos clusters do GKE. Se um node pool estiver num cluster que está a ser atualizado, pode ver o seguinte erro:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Para verificar se o cluster está a ser atualizado, na Google Cloud consola, aceda à página Clusters Kubernetes e procure o ícone de carregamento junto ao nome do cluster do seu ambiente.

O que se segue?