Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página explica como ativar o CeleryKubernetesExecutor no Cloud Composer e como usar o KubernetesExecutor nos seus DAGs.
Acerca do CeleryKubernetesExecutor
O CeleryKubernetesExecutor é um tipo de executor que pode usar o CeleryExecutor e o KubernetesExecutor ao mesmo tempo. O Airflow seleciona o executor com base na fila que definir para a tarefa. Num DAG, pode executar algumas tarefas com o CeleryExecutor e outras tarefas com o KubernetesExecutor:
- O CeleryExecutor está otimizado para uma execução rápida e escalável de tarefas.
- O KubernetesExecutor foi concebido para a execução de tarefas com utilização intensiva de recursos e a execução de tarefas isoladamente.
CeleryKubernetesExecutor no Cloud Composer
O CeleryKubernetesExecutor no Cloud Composer oferece a capacidade de usar o KubernetesExecutor para as suas tarefas. Não é possível usar o KubernetesExecutor no Cloud Composer separadamente do CeleryKubernetesExecutor.
O Cloud Composer executa tarefas que executa com o KubernetesExecutor no cluster do seu ambiente, no mesmo espaço de nomes com os trabalhadores do Airflow. Estas tarefas têm as mesmas associações que os trabalhadores do Airflow e podem aceder a recursos no seu projeto.
As tarefas que executa com o KubernetesExecutor usam o modelo de preços do Cloud Composer, uma vez que os pods com estas tarefas são executados no cluster do seu ambiente. Os SKUs de computação do Cloud Composer (para CPU, memória e armazenamento) aplicam-se a estes pods.
Recomendamos que execute tarefas com o CeleryExecutor quando:
- O tempo de arranque da tarefa é importante.
- As tarefas não requerem isolamento de tempo de execução e não consomem muitos recursos.
Recomendamos que execute tarefas com o KubernetesExecutor quando:
- As tarefas requerem isolamento do tempo de execução. Por exemplo, para que as tarefas não concorram pela memória e pela CPU, uma vez que são executadas nos seus próprios pods.
- As tarefas consomem muitos recursos e quer controlar os recursos de CPU e memória disponíveis.
KubernetesExecutor em comparação com KubernetesPodOperator
A execução de tarefas com o KubernetesExecutor é semelhante à execução de tarefas com o KubernetesPodOperator. As tarefas são executadas em pods, o que proporciona isolamento de tarefas ao nível do pod e uma melhor gestão de recursos.
No entanto, existem algumas diferenças importantes:
- O KubernetesExecutor executa tarefas apenas no espaço de nomes do Cloud Composer com controlo de versões do seu ambiente. Não é possível alterar este espaço de nomes no Cloud Composer. Pode especificar um espaço de nomes onde o KubernetesPodOperator executa tarefas de pod.
- O KubernetesExecutor pode usar qualquer operador do Airflow integrado. O KubernetesPodOperator executa apenas um script fornecido definido pelo ponto de entrada do contentor.
- O KubernetesExecutor usa a imagem Docker do Cloud Composer predefinida com as mesmas substituições da opção de configuração do Python e do Airflow, variáveis de ambiente e pacotes PyPI que estão definidos no seu ambiente do Cloud Composer.
Acerca das imagens de Docker
Por predefinição, o KubernetesExecutor inicia tarefas com a mesma imagem do Docker que o Cloud Composer usa para os trabalhadores do Celery. Esta é a imagem do Cloud Composer para o seu ambiente, com todas as alterações que especificou para o seu ambiente, como pacotes PyPI personalizados ou variáveis de ambiente.
Antes de começar
Pode usar o CeleryKubernetesExecutor no Cloud Composer 3.
Não é possível usar nenhum executor que não seja o CeleryKubernetesExecutor no Cloud Composer 3. Isto significa que pode executar tarefas com o CeleryExecutor, o KubernetesExecutor ou ambos num DAG, mas não é possível configurar o seu ambiente para usar apenas o KubernetesExecutor ou o CeleryExecutor.
Configure o CeleryKubernetesExecutor
Recomendamos que substitua as opções de configuração do Airflow existentes relacionadas com o KubernetesExecutor:
[kubernetes]worker_pods_creation_batch_size
Esta opção define o número de chamadas de criação de pods de trabalho do Kubernetes por ciclo do agendador. O valor predefinido é
1
, pelo que apenas é iniciado um único pod por pulsação do programador. Se usar o KubernetesExecutor com frequência, recomendamos que aumente este valor.[kubernetes]worker_pods_pending_timeout
Esta opção define, em segundos, durante quanto tempo um trabalhador pode permanecer no estado
Pending
(o pod está a ser criado) antes de ser considerado com falhas. O valor predefinido é de 5 minutos.
Execute tarefas com o KubernetesExecutor ou o CeleryExecutor
Pode executar tarefas com o CeleryExecutor, o KubernetesExecutor ou ambos num DAG:
- Para executar uma tarefa com KubernetesExecutor, especifique o valor
kubernetes
no parâmetroqueue
de uma tarefa. - Para executar uma tarefa com CeleryExecutor, omita o parâmetro
queue
.
O exemplo seguinte executa a tarefa task-kubernetes
com o KubernetesExecutor e a tarefa task-celery
com o CeleryExecutor:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
Execute comandos da CLI do Airflow relacionados com o KubernetesExecutor
Pode executar vários comandos da CLI do Airflow relacionados com o KubernetesExecutor usando o gcloud
.
Personalize a especificação do pod de trabalho
Pode personalizar a especificação do pod de trabalho transmitindo-a no parâmetro executor_config
de uma tarefa. Pode usar esta opção para definir requisitos personalizados de CPU e memória.
Pode substituir toda a especificação do agrupamento de trabalhadores usada para executar uma tarefa. Para
obter a especificação do pod de uma tarefa usada pelo KubernetesExecutor, pode
executar o comando kubernetes generate-dag-yaml
da CLI do Airflow.
Para mais informações sobre a personalização da especificação do pod de trabalho, consulte a documentação do Airflow.
O Cloud Composer 3 suporta os seguintes valores para requisitos de recursos:
Recurso | Mínimo | Máximo | Passo |
---|---|---|---|
CPU | 0,25 | 32 | Valores de passo: 0,25, 0,5, 1, 2, 4, 6, 8, 10, …, 32. Os valores pedidos são arredondados para cima para o valor de passo suportado mais próximo (por exemplo, 5 para 6). |
Memória | 2G (GB) | 128 GB | Valores de passo: 2, 3, 4, 5, …, 128. Os valores pedidos são arredondados para o valor de passo suportado mais próximo (por exemplo, 3,5G para 4G). |
Armazenamento | - | 100G (GB) | Qualquer valor. Se forem pedidos mais de 100 GB, só são disponibilizados 100 GB. |
Para mais informações sobre as unidades de recursos no Kubernetes, consulte o artigo Unidades de recursos no Kubernetes.
O exemplo seguinte demonstra uma tarefa que usa a especificação do pod de trabalho personalizado:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '0.5',
'memory': '2G',
})
),
],
),
)
},
)
Ver registos de tarefas
Os registos de tarefas executadas pelo KubernetesExecutor estão disponíveis no separador Registos, juntamente com os registos de tarefas executadas pelo CeleryExecutor:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador Registos.
Navegue para Todos os registos > Registos do Airflow > Trabalhadores.
Os trabalhadores com o nome
airflow-k8s-worker
executam tarefas de KubernetesExecutor. Para procurar registos de uma tarefa específica, pode usar um ID de DAG ou um ID de tarefa como palavra-chave na pesquisa.
O que se segue?
- Resolução de problemas do KubernetesExecutor
- Usar o KubernetesPodOperator
- Usar operadores do GKE
- Substituir opções de configuração do Airflow