Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Nesta página, descrevemos como usar o KubernetesPodOperator
para implantar
Pods do Kubernetes
do Cloud Composer para o Google Kubernetes Engine
cluster que faz parte do ambiente do Cloud Composer e garantir
que seu ambiente tem os recursos apropriados.
KubernetesPodOperator
inicializações
Pods do Kubernetes
no cluster do ambiente. Em comparação,
Os operadores do Google Kubernetes Engine executam pods do Kubernetes em um
que pode ser um cluster separado, não relacionado ao seu
de nuvem. Também é possível criar e excluir clusters usando
operadores do Google Kubernetes Engine.
O KubernetesPodOperator
é uma boa opção, se você precisar de:
- Dependências personalizadas do Python que não estão disponíveis no repositório PyPI público.
- Dependências binárias que não estão disponíveis na imagem do worker do Cloud Composer.
Esta página mostra um exemplo de DAG do Airflow que inclui o seguinte
Configurações do KubernetesPodOperator
:
- Configuração mínima: define somente os parâmetros obrigatórios.
- Configuração de modelo: usa parâmetros que você pode usar como modelo com o Jinja.
Configuração de variáveis secrets: transmite um objeto Secret do Kubernetes para o pod.
No Cloud Composer 2, a configuração de afinidade do pod não está disponível. Em vez disso, use os operadores do GKE para iniciar pods em um cluster diferente.
Configuração completa: inclui todas as configurações.
Antes de começar
No Cloud Composer 2, o cluster do ambiente é escalonado automaticamente. As cargas de trabalho extras que você executa usando
KubernetesPodOperator
são escalonadas independentemente do ambiente. Seu ambiente não é afetado pelo aumento da demanda de recursos, mas o cluster do ambiente aumenta ou diminui dependendo da demanda de recursos. O preço das cargas de trabalho extras executadas nos o cluster do seu ambiente segue a Modelo de preços do Cloud Composer 2 e usa SKUs de computação do Cloud Composer.O Cloud Composer 2 usa clusters do GKE com Federação de identidade da carga de trabalho para o GKE. Por padrão, os pods que são executados em um namespace recém-criado ou no namespace
composer-user-workloads
não podem acessar os recursos do Google Cloud. Ao usar Federação de identidade da carga de trabalho para o GKE, contas de serviço do Kubernetes associadas a os namespaces precisam ser mapeados a contas de serviço do Google Cloud para ativar a autorização de identidade do serviço para solicitações às APIs do Google e outros serviços.Por isso, se você executar pods no namespace
composer-user-workloads
ou um namespace recém-criado no cluster do seu ambiente, e as dependências Vinculações do IAM entre o Kubernetes e o Google Cloud contas de serviço não são criadas e esses pods não podem acessar recursos do seu projeto do Google Cloud.Se você quiser que seus pods tenham acesso aos recursos do Google Cloud, Depois, use o namespace
composer-user-workloads
ou crie seu próprio como descrito a seguir.Para dar acesso aos recursos do seu projeto, faça o seguinte: siga as orientações na federação de identidade da carga de trabalho para GKE e configure as vinculações:
- Crie um namespace separado no cluster do ambiente.
- Crie um vínculo entre as
composer-user-workloads/<namespace_name>
conta de serviço do Kubernetes e a conta de serviço do seu ambiente. - Adicione a anotação da conta de serviço do seu ambiente à conta de serviço do Kubernetes.
- Ao usar
KubernetesPodOperator
, especifique o namespace e a conta de serviço do Kubernetes nos parâmetrosnamespace
eservice_account_name
.
O Cloud Composer 2 usa clusters do GKE com a Identidade da carga de trabalho. O servidor de metadados do GKE leva alguns segundos para começar a aceitar solicitações em um pod recém-criado. Portanto, as tentativas usando a Identidade da carga de trabalho nos primeiros segundos da a vida útil do pod pode falhar. Para obter mais informações sobre essa limitação, consulte Restrições da Identidade da Carga de Trabalho.
O Cloud Composer 2 usa clusters do Autopilot, que introduzem a noção de classes de computação:
Por padrão, se nenhuma classe for selecionada, a classe
general-purpose
será usada ao criar pods usandoKubernetesPodOperator
.Cada classe está associada a propriedades e limites de recursos específicos, Você pode ler sobre eles em Documentação do Autopilot. Por exemplo, os pods executados na classe
general-purpose
podem usar até 110 GiB de memória.
Se a versão 5.0.0 do CNCF Kubernetes Provider for usada, siga as instruções documentada a seção CNCF Kubernetes Provider.
Configuração do KubernetesPodOperator
Para acompanhar este exemplo, coloque todo o arquivo kubernetes_pod_operator.py
na pasta dags/
do ambiente ou
adicione o código relevante KubernetesPodOperator
a um DAG.
As seções a seguir explicam cada configuração KubernetesPodOperator
no exemplo. Para mais informações sobre cada variável de configuração, consulte a referência do Airflow.
Configuração mínima
Para criar um KubernetesPodOperator
, somente o name
do pod, namespace
, em que
executar o pod, image
para usar e task_id
são obrigatórios.
Quando você colocar este snippet de código em um DAG, a configuração usará os
padrões em /home/airflow/composer_kube_config
. Não é necessário modificar o
código para que a tarefa pod-ex-minimum
seja bem-sucedida.
Configuração do modelo
O Airflow é compatível com o
modelo Jinja.
Você precisa declarar as variáveis necessárias (task_id
, name
, namespace
e image
) com o operador. Como mostra o exemplo a seguir, é possível
criar modelos de todos os outros parâmetros com o Jinja, incluindo cmds
, arguments
,
env_vars
e config_file
.
Sem alterar o DAG ou ambiente, a tarefa ex-kube-templates
falha devido a dois erros. Os registros mostram que a tarefa está falhando porque a
variável apropriada não existe (my_value
). O segundo erro, que você
pode receber após corrigir o primeiro erro, mostra que a tarefa falha porque
core/kube_config
não é encontrado em config
.
Para corrigir os dois erros, siga as etapas descritas mais detalhadamente.
Para definir my_value
com gcloud
ou a IU do Airflow:
IU do Airflow
Na IU do Airflow 2:
Acesse a IU do Airflow.
Na barra de ferramentas, selecione Administrador > Variáveis.
Na página Listar variáveis, clique em Adicionar um novo registro.
Na página Adicionar variável, digite as seguintes informações:
- Chave:
my_value
- Val:
example_value
- Chave:
Clique em Save.
gcloud
No Airflow 2, digite o seguinte comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Substitua:
ENVIRONMENT
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;
Para se referir a um config_file
personalizado (um arquivo de configuração do Kubernetes),
modifique a opção de configuração kube_config
do Airflow para uma
configuração válida do Kubernetes:
Seção | Chave | Valor |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Aguarde alguns minutos para que o ambiente seja atualizado. Em seguida,
execute a tarefa ex-kube-templates
novamente e verifique se a
tarefa ex-kube-templates
é bem-sucedida.
Configuração de variáveis de secrets
Um secret do Kubernetes
é um objeto que contém dados sensíveis. É possível transmitir secrets para
os pods do Kubernetes usando o KubernetesPodOperator
.
Os secrets precisam ser definidos no Kubernetes ou o pod não será iniciado.
Neste exemplo, mostramos duas maneiras de usar os Kubernetes Secrets: como uma variável de ambiente e como um volume ativado pelo pod.
O primeiro secret, airflow-secrets
, é definido
como uma variável de ambiente do Kubernetes chamada SQL_CONN
(em vez de uma variável de ambiente do Airflow
ou do Cloud Composer).
O segundo secret, service-account
, instala service-account.json
, um arquivo
com um token de conta de serviço, em /var/secrets/google
.
Veja a aparência dos secrets:
O nome do primeiro secret do Kubernetes é definido na variável secret
.
Esse secret privado é chamado de airflow-secrets
. Ele é exposto como uma variável de ambiente, conforme ditado por deploy_type
. A variável
de ambiente definida para, deploy_target
, é SQL_CONN
. Por fim, o key
do
secret armazenado no deploy_target
é sql_alchemy_conn
.
O nome do segundo secret do Kubernetes é definido na variável secret
.
Esse secret privado é chamado de service-account
. Ele é exposto como um
volume, conforme determinado pelo deploy_type
. O caminho do arquivo a ser
ativado, deploy_target
, é /var/secrets/google
. Por fim, o key
do secret
armazenado no deploy_target
é service-account.json
.
Confira como é a configuração do operador:
Sem fazer alterações no DAG ou no ambiente, a tarefa
ex-kube-secrets
falha. Se você analisar os registros, a tarefa falhará devido
a um erro Pod took too long to start
. Esse erro ocorre porque o Airflow
não encontrou o secret especificado na configuração, secret_env
.
gcloud
Para definir o secret usando gcloud
:
Receba informações sobre o cluster de ambiente do Cloud Composer.
Execute este comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Substitua:
ENVIRONMENT
pelo nome do ambiente.LOCATION
é a região em que o ambiente do Cloud Composer está localizado.
A saída desse comando usa o seguinte formato:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Para receber o ID do cluster do GKE, copie a saída depois de
/clusters/
(termina em-gke
).
Conecte-se ao cluster do GKE executando este comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Substitua:
CLUSTER_ID
pelo ID do cluster do GKE.PROJECT
pelo ID do seu projeto do Google Cloud.LOCATION
é a região em que o ambiente do Cloud Composer está localizado.
Crie secrets do Kubernetes.
Execute o seguinte comando para criar um secret do Kubernetes que defina o valor de
sql_alchemy_conn
paratest_value
:kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Crie um secret do Kubernetes que defina o valor de
service-account.json
como um caminho local de um arquivo de chave da conta de serviço chamadokey.json
executando o seguinte comando:kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Depois de definir os secrets, execute a tarefa
ex-kube-secrets
novamente na IU do Airflow.Verifique se a tarefa
ex-kube-secrets
foi bem-sucedida.
Configuração completa
Este exemplo mostra todas as variáveis que você pode configurar em KubernetesPodOperator
. Não é necessário modificar o código para que a tarefa
ex-all-configs
seja bem-sucedida.
Para ver detalhes sobre cada variável, consulte a
referência KubernetesPodOperator
do Airflow.
Informações sobre o provedor do Kubernetes da CNCF
O GKEStartPodOperator e o KubernetesPodOperator são implementados no
provedor apache-airflow-providers-cncf-kubernetes
.
Para notas da versão com falha do provedor do Kubernetes CNCF, consulte o site do provedor do Kubernetes da CNCF.
Versão 6.0.0
Na versão 6.0.0 do pacote do provedor do Kubernetes da CNCF,
a conexão kubernetes_default
é usada por padrão no
KubernetesPodOperator
.
Se você especificou uma conexão personalizada na versão 5.0.0, esta conexão personalizada
ainda é usado pelo operador. Para voltar a usar o kubernetes_default
talvez você queira ajustar seus DAGs adequadamente.
Versão 5.0.0
Esta versão introduz algumas mudanças incompatíveis com versões anteriores
em comparação com a versão 4.4.0. Os mais importantes estão relacionados
a conexão kubernetes_default
que não é usada na versão 5.0.0.
- A conexão
kubernetes_default
precisa ser modificada. Caminho de configuração do Kube precisa ser definido como/home/airflow/composer_kube_config
(conforme mostrado na Figura 1). Como alternativa, adicioneconfig_file
à configuraçãoKubernetesPodOperator
, conforme mostrado no código a seguir. exemplo).
- Modifique o código de uma tarefa usando o KubernetesPodOperator da seguinte maneira:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para mais informações sobre a versão 5.0.0, consulte as Notas de lançamento do provedor do Kubernetes CNCF.
Solução de problemas
Dicas para solucionar problemas de falhas no pod
Além de verificar os registros de tarefas na IU do Airflow, verifique também os seguintes registros:
Saída do programador e dos workers do Airflow:
No console do Google Cloud, acesse a página Ambientes.
Acesse o link DAGs para seu ambiente.
No bucket do ambiente, suba um nível.
Revise os registros na pasta
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Registros detalhados do pod no console do Google Cloud nas cargas de trabalho do GKE. Esses registros incluem o arquivo YAML de definição, os eventos e os detalhes do pod.
Códigos de retorno diferente de zero quando também usar GKEStartPodOperator
Ao usar KubernetesPodOperator
e GKEStartPodOperator
, o
código de retorno do ponto de entrada do contêiner determina se a tarefa é
considerada bem-sucedida ou não. Os códigos de retorno diferentes de zero indicam a falha.
Um padrão comum ao usar
KubernetesPodOperator
e GKEStartPodOperator
é executar um script de shell como o ponto de entrada do
contêiner para agrupar várias operações dentro do contêiner.
Se você estiver escrevendo esse script, recomendamos que inclua o comando set -e
na parte superior para que comandos com falha encerrem o script e propaguem a falha para a instância de tarefa do Airflow.
Tempos limite do pod
O tempo limite padrão para KubernetesPodOperator
é de 120 segundos, o que pode resultar na ocorrência de tempos limite antes do download de imagens maiores. É possível aumentar o tempo limite alterando o parâmetro startup_timeout_seconds
, quando você cria o KubernetesPodOperator
.
Quando um pod expira, o registro específico da tarefa fica disponível na IU do Airflow. Exemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Os tempos limite do pod também podem ocorrer Conta de serviço do Cloud Composer não tem as permissões de IAM necessárias para executar a tarefa em mão. Para verificar isso, veja os erros no nível do pod usando o Painéis do GKE para conferir os registros da sua uma carga de trabalho específica ou o Cloud Logging.
Falha ao estabelecer uma nova conexão
O upgrade automático é ativado por padrão nos clusters do GKE. Se um pool de nós estiver em um cluster que está fazendo um upgrade, você poderá receber este erro:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para verificar se o cluster está sendo atualizado, acesse o console do Google Cloud Clusters do Kubernetes e procure o ícone de carregamento ao lado do cluster do seu ambiente de execução.