Usar o CeleryKubernetesExecutor

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Nesta página, explicamos como ativar o CeleryKubernetesExecutor no Cloud Composer e usar o KubernetesExecutor nos seus DAGs.

Sobre o CeleryKubernetesExecutor

O CeleryKubernetesExecutor é um tipo de executor que pode usar o CeleryExecutor e o KubernetesExecutor ao mesmo tempo. O Airflow seleciona o executor com base na fila definida para a tarefa. Em um DAG, é possível executar algumas tarefas com o CeleryExecutor e outras com o KubernetesExecutor:

  • O CeleryExecutor é otimizado para execução rápida e escalonável de tarefas.
  • O KubernetesExecutor foi projetado para execução de tarefas que exigem muitos recursos e execução de tarefas isoladas.

CeleryKubernetesExecutor no Cloud Composer

O CeleryKubernetesExecutor no Cloud Composer permite usar o KubernetesExecutor nas suas tarefas. Não é possível usar o KubernetesExecutor no Cloud Composer separadamente do CeleryKubernetesExecutor.

O Cloud Composer executa tarefas que você executa com o KubernetesExecutor no cluster do ambiente, no mesmo namespace com os workers do Airflow. Essas tarefas têm as mesmas vinculações que os workers do Airflow e podem acessar recursos no seu projeto.

As tarefas executadas com o KubernetesExecutor usam o modelo de preços do Cloud Composer, já que os pods com essas tarefas são executados no cluster do ambiente. As SKUs de computação do Cloud Composer (para CPU, memória e armazenamento) se aplicam a esses pods.

Recomendamos executar tarefas com o CeleryExecutor quando:

  • O tempo de inicialização da tarefa é importante.
  • As tarefas não exigem isolamento de tempo de execução e não consomem muitos recursos.

Recomendamos executar tarefas com o KubernetesExecutor quando:

  • As tarefas exigem isolamento de tempo de execução. Por exemplo, para que as tarefas não disputem memória e CPU, já que são executadas nos próprios pods.
  • As tarefas consomem muitos recursos, e você quer controlar os recursos de CPU e memória disponíveis.

KubernetesExecutor em comparação com KubernetesPodOperator

A execução de tarefas com o KubernetesExecutor é semelhante a executar tarefas usando o KubernetesPodOperator. As tarefas são executadas em pods, oferecendo isolamento de tarefas no nível do pod e melhor gerenciamento de recursos.

No entanto, há algumas diferenças importantes:

  • O KubernetesExecutor executa tarefas apenas no namespace versionado do Cloud Composer do seu ambiente. Não é possível mudar esse namespace no Cloud Composer. É possível especificar um namespace em que o KubernetesPodOperator executa tarefas de pod.
  • O KubernetesExecutor pode usar qualquer operador integrado do Airflow. O KubernetesPodOperator executa apenas um script fornecido definido pelo ponto de entrada do contêiner.
  • O KubernetesExecutor usa a imagem padrão do Docker do Cloud Composer com o mesmo Python, modificações de opção de configuração do Airflow, variáveis de ambiente e pacotes PyPI definidos no seu ambiente do Cloud Composer.

Sobre imagens do Docker

Por padrão, o KubernetesExecutor inicia tarefas usando a mesma imagem do Docker que o Cloud Composer usa para workers do Celery. Essa é a imagem do Cloud Composer do seu ambiente, com todas as mudanças especificadas, como pacotes PyPI personalizados ou variáveis de ambiente.

Antes de começar

  • É possível usar o CeleryKubernetesExecutor no Cloud Composer 3.

  • Não é possível usar nenhum executor que não seja o CeleryKubernetesExecutor no Cloud Composer 3. Isso significa que é possível executar tarefas usando CeleryExecutor, KubernetesExecutor ou ambos em um DAG, mas não é possível configurar seu ambiente para usar apenas KubernetesExecutor ou CeleryExecutor.

Configurar o CeleryKubernetesExecutor

Talvez você queira substituir as opções de configuração do Airflow relacionadas ao KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Essa opção define o número de chamadas de criação de pods de worker do Kubernetes por loop do programador. O valor padrão é 1, então apenas um pod é iniciado por pulsação do programador. Se você usa muito o KubernetesExecutor, recomendamos aumentar esse valor.

  • [kubernetes]worker_pods_pending_timeout

    Essa opção define, em segundos, por quanto tempo um worker pode permanecer no estado Pending (o pod está sendo criado) antes de ser considerado com falha. O valor padrão é de 5 minutos.

Executar tarefas com KubernetesExecutor ou CeleryExecutor

É possível executar tarefas usando CeleryExecutor, KubernetesExecutor ou ambos em um DAG:

  • Para executar uma tarefa com o KubernetesExecutor, especifique o valor kubernetes no parâmetro queue de uma tarefa.
  • Para executar uma tarefa com o CeleryExecutor, omita o parâmetro queue.

O exemplo a seguir executa a tarefa task-kubernetes usando KubernetesExecutor e a tarefa task-celery usando CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Executar comandos da CLI do Airflow relacionados ao KubernetesExecutor

É possível executar vários comandos da CLI do Airflow relacionados ao KubernetesExecutor usando gcloud.

Personalizar a especificação do pod de worker

É possível personalizar a especificação do pod de worker transmitindo-a no parâmetro executor_config de uma tarefa. É possível usar isso para definir requisitos personalizados de CPU e memória.

É possível substituir toda a especificação do pod de worker usada para executar uma tarefa. Para recuperar a especificação do pod de uma tarefa usada pelo KubernetesExecutor, execute o comando kubernetes generate-dag-yaml da CLI do Airflow

Para mais informações sobre como personalizar a especificação do pod de worker, consulte a documentação do Airflow.

O Cloud Composer 3 é compatível com os seguintes valores para requisitos de recursos:

Recurso Mínimo Máximo Etapa
CPU 0,25 32 Valores de intervalo: 0,25, 0,5, 1, 2, 4, 6, 8, 10, ..., 32. Os valores solicitados são arredondados para o valor de etapa compatível mais próximo (por exemplo, de 5 para 6).
Memória 2G (GB) 128G (GB) Valores de intervalo: 2, 3, 4, 5, ..., 128. Os valores solicitados são arredondados para o valor de etapa compatível mais próximo (por exemplo, 3,5G para 4G).
Armazenamento - 100G (GB) Qualquer valor Se mais de 100 GB forem solicitados, apenas 100 GB serão fornecidos.

Para mais informações sobre unidades de recursos no Kubernetes, consulte Unidades de recursos no Kubernetes.

O exemplo a seguir demonstra uma tarefa que usa uma especificação de pod de worker personalizada:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

Ver registros de tarefas

Os registros das tarefas executadas pelo KubernetesExecutor estão disponíveis na guia Registros, junto com os registros das tarefas executadas pelo CeleryExecutor:

  1. No console Google Cloud , acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros.

  4. Acesse Todos os registros > Registros do Airflow > Workers.

  5. Os workers chamados airflow-k8s-worker executam tarefas do KubernetesExecutor. Para procurar registros de uma tarefa específica, use um ID de DAG ou de tarefa como palavra-chave na pesquisa.

A seguir