Usar o CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Nesta página, explicamos como ativar o CeleryKubernetesExecutor no Cloud Composer e como usar o KubernetesExecutor nos DAGs.

Sobre o CeleryKubernetesExecutor

CeleryKubernetesExecutor é um tipo de executor que pode usar o CeleryExecutor e o KubernetesExecutor ao mesmo tempo. O Airflow seleciona o executor com base na fila definida para a tarefa. Em um DAG, é possível executar algumas tarefas com o CeleryExecutor e outras tarefas com o KubernetesExecutor:

  • O CeleryExecutor é otimizado para a execução rápida e escalonável de tarefas.
  • O KubernetesExecutor foi projetado para a execução de tarefas com uso intensivo de recursos e execução de tarefas isoladamente.

CeleryKubernetesExecutor no Cloud Composer

O CeleryKubernetesExecutor no Cloud Composer permite usar o KubernetesExecutor para suas tarefas. Não é possível usar o KubernetesExecutor no Cloud Composer separadamente do CeleryKubernetesExecutor.

O Cloud Composer executa tarefas que você executa com KubernetesExecutor no cluster do ambiente, no mesmo namespace com workers do Airflow. Essas tarefas têm as mesmas vinculações que os workers do Airflow e podem acessar recursos no seu projeto.

As tarefas que você executa com o KubernetesExecutor usam o modelo de preços do Cloud Composer, já que os pods com essas tarefas são executados no cluster do ambiente. As SKUs de computação do Cloud Composer (para CPU, memória e armazenamento) se aplicam a esses pods.

Recomendamos executar tarefas com o CeleryExecutor quando:

  • O tempo de inicialização da tarefa é importante.
  • As tarefas não exigem isolamento do ambiente de execução e não consomem muitos recursos.

Recomendamos executar tarefas com o KubernetesExecutor quando:

  • As tarefas exigem isolamento do ambiente de execução. Por exemplo, para que as tarefas não concorram por memória e CPU, já que são executadas nos próprios pods.
  • As tarefas exigem bibliotecas de sistema adicionais (ou pacotes PyPI).
  • As tarefas exigem muitos recursos, e você quer controlar os recursos de CPU e memória disponíveis.

KubernetesExecutor em comparação com KubernetesPodOperator

A execução de tarefas com KubernetesExecutor é semelhante à execução de tarefas usando o KubernetesPodOperator. As tarefas são executadas em pods, oferecendo isolamento de tarefas no nível do pod e melhor gerenciamento de recursos.

No entanto, existem algumas diferenças importantes:

  • O KubernetesExecutor executa tarefas apenas no namespace do Cloud Composer com controle de versão do ambiente. Não é possível alterar esse namespace no Cloud Composer. É possível especificar um namespace em que o KubernetesPodOperator executa tarefas de pod.
  • KubernetesExecutor pode usar qualquer operador integrado do Airflow. O KubernetesPodOperator executa apenas um script fornecido definido pelo ponto de entrada do contêiner.
  • O KubernetesExecutor usa a imagem padrão do Docker do Cloud Composer com as mesmas modificações de opções de configuração do Python, do Airflow, variáveis de ambiente e pacotes PyPI definidos no ambiente do Cloud Composer.

Sobre as imagens do Docker

Por padrão, o KubernetesExecutor inicia tarefas usando a mesma imagem do Docker que o Cloud Composer usa para os workers do Celery. Essa é a imagem do Cloud Composer do ambiente com todas as alterações especificadas nele, como pacotes PyPI personalizados ou variáveis de ambiente.

Antes de começar

  • É possível usar o CeleryKubernetesExecutor no Cloud Composer 3.

  • Não é possível usar nenhum executor diferente do CeleryKubernetesExecutor no Cloud Composer 3. Isso significa que é possível executar tarefas usando CeleryExecutor, KubernetesExecutor ou ambos em um DAG, mas não é possível configurar o ambiente para usar apenas KubernetesExecutor ou CeleryExecutor.

Configurar o CeleryKubernetesExecutor

Talvez você queira substituir as opções de configuração atuais do Airflow relacionadas ao KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Essa opção define o número de chamadas de criação de pods de worker do Kubernetes por loop do programador. O valor padrão é 1. Portanto, apenas um pod é iniciado por sinal de funcionamento do programador. Se você usa muito o KubernetesExecutor, recomendamos aumentar esse valor.

  • [kubernetes]worker_pods_pending_timeout

    Essa opção define, em segundos, quanto tempo um worker pode permanecer no estado Pending (o pod está sendo criado) antes de ser considerado com falha. O valor padrão é 5 minutos.

Executar tarefas com KubernetesExecutor ou CeleryExecutor

É possível executar tarefas usando o CeleryExecutor, KubernetesExecutor ou ambos em um DAG:

  • Para executar uma tarefa com KubernetesExecutor, especifique o valor kubernetes no parâmetro queue de uma tarefa.
  • Para executar uma tarefa com o CeleryExecutor, omita o parâmetro queue.

O exemplo a seguir executa a tarefa task-kubernetes usando KubernetesExecutor e a tarefa task-celery usando o CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Executar comandos da CLI do Airflow relacionados ao KubernetesExecutor

É possível executar vários comandos da CLI do Airflow relacionados ao KubernetesExecutor usando gcloud.

Personalizar especificação do pod de worker

É possível personalizar a especificação do pod de worker ao transmiti-la no parâmetro executor_config de uma tarefa. Use isso para definir requisitos personalizados de CPU e memória.

É possível substituir toda a especificação do pod de worker usada para executar uma tarefa. Para recuperar a especificação do pod de uma tarefa usada pelo KubernetesExecutor, é possível executar o comando kubernetes generate-dag-yaml da CLI do Airflow.

Para mais informações sobre como personalizar a especificação de pod de worker, consulte a documentação do Airflow.

O exemplo a seguir demonstra uma tarefa que usa a especificação do pod de worker personalizado:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Ver registros de tarefas

Os registros de tarefas executadas pelo KubernetesExecutor estão disponíveis na guia Registros, junto com os registros de tarefas executadas pelo CeleryExecutor:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros.

  4. Navegue até Todos os registros > Registros do Airflow > Workers.

  5. Os workers chamados airflow-k8s-worker executam tarefas KubernetesExecutor. Para procurar registros de uma tarefa específica, é possível usar um ID do DAG ou da tarefa como uma palavra-chave na pesquisa.

A seguir