Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Nesta página, explicamos como ativar o CeleryKubernetesExecutor no Cloud Composer e como usar o KubernetesExecutor nas suas DAGs.
Sobre o CeleryKubernetesExecutor
CeleryKubernetesExecutor é um tipo de executor que pode usar CeleryExecutor e KubernetesExecutor ao mesmo tempo. O Airflow seleciona o executor com base na fila que você define para a tarefa. Em um DAG, é possível executar algumas tarefas com o CeleryExecutor e outras com o KubernetesExecutor:
- O CeleryExecutor é otimizado para execução rápida e escalonável de tarefas.
- O KubernetesExecutor foi projetado para executar tarefas que consomem muitos recursos e executar tarefas isoladamente.
CeleryKubernetesExecutor no Cloud Composer
O CeleryKubernetesExecutor no Cloud Composer permite o uso do KubernetesExecutor para suas tarefas. Não é possível usar o KubernetesExecutor no Cloud Composer separadamente do CeleryKubernetesExecutor.
O Cloud Composer executa tarefas que você executa com o KubernetesExecutor no cluster do ambiente, no mesmo namespace com os workers do Airflow. Essas tarefas têm as mesmas vinculações que os workers do Airflow e podem acessar recursos no seu projeto.
As tarefas executadas com o KubernetesExecutor usam o modelo de preços do Cloud Composer, já que os pods com essas tarefas são executados no cluster do ambiente. As SKUs de computação do Cloud Composer (para CPU, memória e armazenamento) se aplicam a esses pods.
Recomendamos executar tarefas com o CeleryExecutor quando:
- O tempo de inicialização da tarefa é importante.
- As tarefas não exigem isolamento de execução e não consomem muitos recursos.
Recomendamos executar tarefas com o KubernetesExecutor quando:
- As tarefas exigem isolamento de execução. Por exemplo, para que as tarefas não concorram por memória e CPU, já que elas são executadas nos próprios pods.
- As tarefas consomem muitos recursos, e você quer controlar os recursos de CPU e memória disponíveis.
KubernetesExecutor em comparação com KubernetesPodOperator
Executar tarefas com o KubernetesExecutor é semelhante a executar tarefas usando o KubernetesPodOperator. As tarefas são executadas em pods, fornecendo isolamento de tarefas no nível do pod e melhor gerenciamento de recursos.
No entanto, há algumas diferenças importantes:
- O KubernetesExecutor executa tarefas apenas no namespace versionado do Cloud Composer do seu ambiente. Não é possível mudar esse namespace no Cloud Composer. É possível especificar um namespace em que o KubernetesPodOperator executa tarefas de pod.
- O KubernetesExecutor pode usar qualquer operador integrado do Airflow. O KubernetesPodOperator executa apenas um script fornecido definido pelo ponto de entrada do contêiner.
- O KubernetesExecutor usa a imagem padrão do Docker do Cloud Composer com as mesmas substituições de opção de configuração do Python e do Airflow, variáveis de ambiente e pacotes PyPI definidos no seu ambiente do Cloud Composer.
Sobre as imagens do Docker
Por padrão, o KubernetesExecutor inicia tarefas usando a mesma imagem do Docker que o Cloud Composer usa para workers do Celery. Esta é a imagem do Cloud Composer para seu ambiente, com todas as mudanças especificadas, como pacotes PyPI personalizados ou variáveis de ambiente.
Antes de começar
É possível usar o CeleryKubernetesExecutor no Cloud Composer 3.
Não é possível usar nenhum executor, exceto o CeleryKubernetesExecutor, no Cloud Composer 3. Isso significa que você pode executar tarefas usando CeleryExecutor, KubernetesExecutor ou ambos em um DAG, mas não é possível configurar o ambiente para usar apenas KubernetesExecutor ou CeleryExecutor.
Configurar o CeleryKubernetesExecutor
Talvez você queira substituir as opções de configuração do Airflow relacionadas ao KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Essa opção define o número de chamadas de criação de pods de worker do Kubernetes por ciclo de agendamento. O valor padrão é
1
, então apenas um pod é iniciado por batimento cardíaco do agendador. Se você usa o KubernetesExecutor com frequência, recomendamos aumentar esse valor.[kubernetes]worker_pods_pending_timeout
Essa opção define, em segundos, por quanto tempo um worker pode permanecer no estado
Pending
(o pod está sendo criado) antes de ser considerado com falha. O valor padrão é 5 minutos.
Executar tarefas com KubernetesExecutor ou CeleryExecutor
É possível executar tarefas usando o CeleryExecutor, o KubernetesExecutor ou ambos em um DAG:
- Para executar uma tarefa com o KubernetesExecutor, especifique o valor
kubernetes
no parâmetroqueue
de uma tarefa. - Para executar uma tarefa com o CeleryExecutor, omita o parâmetro
queue
.
O exemplo a seguir executa a tarefa task-kubernetes
usando
o KubernetesExecutor e a tarefa task-celery
usando o CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Executar comandos da CLI do Airflow relacionados ao KubernetesExecutor
É possível executar vários comandos da CLI do Airflow relacionados ao KubernetesExecutor usando gcloud
.
Personalizar a especificação do pod de worker
É possível personalizar a especificação do pod de worker transmitindo-a no parâmetro executor_config
de uma tarefa. Você pode usar isso para definir requisitos personalizados de CPU e
memória.
É possível substituir toda a especificação do pod de worker usada para executar uma tarefa. Para
extrair a especificação do pod de uma tarefa usada pelo KubernetesExecutor, execute o comando CLI kubernetes generate-dag-yaml
do Airflow.
Para mais informações sobre como personalizar a especificação de pods de worker, consulte a documentação do Airflow.
O Cloud Composer 3 oferece suporte aos seguintes valores para requisitos de recursos:
Recurso | Mínimo | Máximo | Etapa |
---|---|---|---|
CPU | 0,25 | 32 | Valores de intervalo: 0,25, 0,5, 1, 2, 4, 6, 8, 10, ..., 32. Os valores solicitados são arredondados para o valor de etapa aceito mais próximo (por exemplo, de 5 para 6). |
Memória | 2G (GB) | 128 GB | Valores de incremento: 2, 3, 4, 5, ..., 128. Os valores solicitados são arredondados para o valor de etapa compatível mais próximo (por exemplo, de 3,5 G para 4G). |
Armazenamento | - | 100 GB | Qualquer valor Se mais de 100 GB forem solicitados, apenas 100 GB serão fornecidos. |
Para mais informações sobre unidades de recursos no Kubernetes, consulte Unidades de recursos no Kubernetes.
O exemplo a seguir demonstra uma tarefa que usa a especificação de pod de worker personalizado:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '0.5',
'memory': '2G',
})
),
],
),
)
},
)
Conferir registros de tarefas
Os registros de tarefas executadas pelo KubernetesExecutor estão disponíveis na guia Logs, junto com os registros de tarefas executadas pelo CeleryExecutor:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros.
Acesse Todos os registros > Registros do Airflow > Workers.
Os workers com o nome
airflow-k8s-worker
executam tarefas do KubernetesExecutor. Para procurar registros de uma tarefa específica, use um ID de DAG ou de tarefa como uma palavra-chave na pesquisa.
A seguir
- Solução de problemas do KubernetesExecutor
- Como usar o KubernetesPodOperator
- Como usar operadores do GKE
- Como modificar as opções de configuração do Airflow