Usar o CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Nesta página, explicamos como ativar o CeleryKubernetesExecutor no Cloud Composer e como usar KubernetesExecutor nos DAGs.

Sobre o CeleryKubernetesExecutor

O CeleryKubernetesExecutor é um tipo de executor que pode usar CeleryExecutor e KubernetesExecutor ao mesmo tempo de resposta. O Airflow seleciona o executor com base na fila definida para o tarefa. Em um DAG, é possível executar algumas tarefas com o CeleryExecutor e outras com KubernetesExecutor:

  • O CeleryExecutor é otimizado para a execução rápida e escalonável de tarefas.
  • KubernetesExecutor foi projetado para a execução de tarefas que consomem muitos recursos e executar tarefas isoladamente.

CeleryKubernetesExecutor no Cloud Composer

Com o CeleryKubernetesExecutor, no Cloud Composer, é possível usar KubernetesExecutor nas tarefas. Não é possível usar KubernetesExecutor no Cloud Composer separado do o CeleryKubernetesExecutor.

O Cloud Composer executa tarefas que você executa com KubernetesExecutor no cluster do ambiente e no mesmo namespace que os workers do Airflow. Essas as tarefas têm as mesmas vinculações que o Airflow workers e pode acessar recursos no seu projeto.

As tarefas que você executa com KubernetesExecutor usam o Modelo de preços do Cloud Composer, já que os pods com essas as tarefas são executadas no cluster do ambiente. SKUs de computação do Cloud Composer (para CPU, memória e armazenamento) aplicam-se a esses pods.

Recomendamos executar tarefas com o CeleryExecutor quando:

  • O tempo de inicialização da tarefa é importante.
  • As tarefas não exigem isolamento do ambiente de execução e não consomem muitos recursos.

Recomendamos executar tarefas com o KubernetesExecutor quando:

  • As tarefas exigem isolamento do ambiente de execução. Por exemplo, para que as tarefas não concorram memória e CPU, já que são executados nos próprios pods.
  • As tarefas exigem bibliotecas de sistema adicionais (ou pacotes PyPI).
  • As tarefas exigem muitos recursos, e você quer controlar os recursos disponíveis recursos de CPU e memória.

KubernetesExecutor em comparação com KubernetesPodOperator

A execução de tarefas com KubernetesExecutor é semelhante à executar tarefas usando o KubernetesPodOperator. As tarefas são executadas em os pods, fornecendo isolamento de tarefas no nível do pod e melhor gerenciamento de recursos.

No entanto, existem algumas diferenças importantes:

  • KubernetesExecutor executa tarefas apenas no Cloud Composer com controle de versão namespace do seu ambiente. Não é possível alterar esse namespace no Cloud Composer. É possível especificar um namespace em que o KubernetesPodOperator executa tarefas de pod.
  • KubernetesExecutor pode usar qualquer operador integrado do Airflow. KubernetesPodOperator executa apenas um script fornecido definido pelo ponto de entrada do contêiner.
  • KubernetesExecutor usa a imagem Docker padrão do Cloud Composer com as mesmas substituições de opções de configuração do Python, do Airflow, variáveis e pacotes PyPI definidos no ambiente do Cloud Composer.

Sobre as imagens do Docker

Por padrão, KubernetesExecutor inicia tarefas usando a mesma imagem Docker que O Cloud Composer usa para workers do Celery. Esta é a imagem do Cloud Composer para seu ambiente, com todas as mudanças especificadas para seu ambiente, como PyPI personalizado pacotes ou variáveis de ambiente.

Antes de começar

  • É possível usar o CeleryKubernetesExecutor no Cloud Composer 3.

  • Não é possível usar nenhum executor diferente do CeleryKubernetesExecutor. no Cloud Composer 3. Isso significa que você pode executar tarefas usando CeleryExecutor, KubernetesExecutor ou ambos em um DAG, mas não possível configurar seu ambiente para usar apenas KubernetesExecutor ou o CeleryExecutor.

Configurar o CeleryKubernetesExecutor

substitua a configuração atual do Airflow opções relacionadas a KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Essa opção define o número de chamadas de criação de pods de worker do Kubernetes por loop do programador. O valor padrão é 1, então apenas um pod é iniciado por sinal de funcionamento do programador. Se você usa muito o KubernetesExecutor, recomendado para aumentar esse valor.

  • [kubernetes]worker_pods_pending_timeout

    Essa opção define, em segundos, por quanto tempo um worker pode permanecer no Pending estado (o pod está sendo criado) antes de ser considerado com falha. O padrão é de 5 minutos.

Executar tarefas com KubernetesExecutor ou CeleryExecutor

É possível executar tarefas usando o CeleryExecutor, KubernetesExecutor ou ambos em um DAG:

  • Para executar uma tarefa com KubernetesExecutor, especifique o valor kubernetes no queue de uma tarefa.
  • Para executar uma tarefa com o CeleryExecutor, omita o parâmetro queue.
.

O exemplo a seguir executa a tarefa task-kubernetes usando KubernetesExecutor e a tarefa task-celery usando o CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Executar comandos da CLI do Airflow relacionados ao KubernetesExecutor

É possível executar várias Comandos da CLI do Airflow relacionados ao KubernetesExecutor usando gcloud.

Personalizar especificação do pod de worker

É possível personalizar a especificação do pod de worker ao transmiti-la no executor_config de uma tarefa. É possível usar isso para definir CPU e memória personalizados e cumprimento de requisitos regulatórios.

É possível substituir toda a especificação do pod de worker usada para executar uma tarefa. Para recuperar a especificação do pod de uma tarefa usada pelo KubernetesExecutor, é possível executar a CLI kubernetes generate-dag-yaml do Airflow kubectl.

Para mais informações sobre como personalizar a especificação do pod de worker, consulte Documentação do Airflow.

O exemplo a seguir demonstra uma tarefa que usa a especificação do pod de worker personalizado:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Ver registros de tarefas

Os registros das tarefas executadas pelo KubernetesExecutor estão disponíveis na guia Registros. com os registros de tarefas executadas pelo CeleryExecutor:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros.

  4. Acesse Todos os registros > Registros do Airflow. > Workers.

  5. Execução de workers chamados airflow-k8s-worker KubernetesExecutor. Para procurar os registros de uma tarefa específica, usar um ID do DAG ou da tarefa como uma palavra-chave na pesquisa.

A seguir