Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página descreve como usar o KubernetesPodOperator para implementar pods do Kubernetes do Cloud Composer no cluster do Google Kubernetes Engine que faz parte do seu ambiente do Cloud Composer.
O KubernetesPodOperator inicia pods do Kubernetes no cluster do seu ambiente. Em comparação, os operadores do Google Kubernetes Engine executam pods do Kubernetes num cluster especificado, que pode ser um cluster separado não relacionado com o seu ambiente. Também pode criar e eliminar clusters através dos operadores do Google Kubernetes Engine.
O KubernetesPodOperator é uma boa opção se precisar de:
- Dependências personalizadas do Python que não estão disponíveis através do repositório público PyPI.
- Dependências binárias que não estão disponíveis na imagem de trabalho do Cloud Composer padrão.
Antes de começar
Se for usada a versão 5.0.0 do fornecedor do CNCF Kubernetes, siga as instruções documentadas na secção do fornecedor do CNCF Kubernetes.
A configuração de afinidade de pods não está disponível no Cloud Composer 2. Se quiser usar a afinidade de pods, use os operadores do GKE para iniciar pods num cluster diferente.
Acerca do KubernetesPodOperator no Cloud Composer 2
Esta secção descreve como funciona o KubernetesPodOperator no Cloud Composer 2.
Utilização de recursos
No Cloud Composer 2, o cluster do seu ambiente é dimensionado automaticamente. As cargas de trabalho adicionais que executa com o KubernetesPodOperator são dimensionadas independentemente do seu ambiente.
O seu ambiente não é afetado pelo aumento da procura de recursos, mas o cluster do seu ambiente é dimensionado para cima e para baixo consoante a procura de recursos.
O preço das cargas de trabalho adicionais que executa no cluster do seu ambiente segue o modelo de preços do Cloud Composer 2 e usa SKUs de computação do Cloud Composer.
O Cloud Composer 2 usa clusters do Autopilot que introduzem a noção de classes de computação:
O Cloud Composer só suporta a classe de computação
general-purpose
.Por predefinição, se não for selecionada nenhuma classe, a classe
general-purpose
é assumida quando cria pods com o KubernetesPodOperator.Cada classe está associada a propriedades específicas e limites de recursos. Pode ler sobre elas na documentação do Autopilot. Por exemplo, os pods que são executados na classe
general-purpose
podem usar até 110 GiB de memória.
Acesso aos recursos do projeto
O Cloud Composer 2 usa clusters do GKE com a
Workload Identity Federation para o GKE. Os pods executados no espaço de nomes composer-user-workloads
podem aceder Google Cloud aos recursos no seu projeto sem
configuração adicional. A conta de serviço do seu ambiente
é usada para aceder a estes recursos.
Se quiser usar um espaço de nomes personalizado, as contas de serviço do Kubernetes associadas a este espaço de nomes têm de ser mapeadas para contas de serviço, para ativar a autorização de identidade de serviço para pedidos às APIs Google e a outros serviços. Google Cloud Se executar pods num espaço de nomes personalizado no cluster do seu ambiente, as associações de IAM entre o Kubernetes e as contas de serviço não são criadas, e estes pods não podem aceder aos recursos do seu projeto.Google Cloud Google Cloud
Se usar um espaço de nomes personalizado e quiser que os seus pods tenham acesso a Google Cloud recursos, então siga as orientações na Federação do Workload Identity para o GKE e configure as associações para um espaço de nomes personalizado:
- Crie um namespace separado no cluster do seu ambiente.
- Crie uma associação entre a conta de serviço do Kubernetes do espaço de nomes personalizado e a conta de serviço do seu ambiente.
- Adicione a anotação da conta de serviço do seu ambiente à conta de serviço do Kubernetes.
- Quando usa o KubernetesPodOperator, especifique o espaço de nomes e a conta de serviço do Kubernetes nos parâmetros
namespace
eservice_account_name
.
Configuração mínima
Para criar um KubernetesPodOperator, apenas são necessários os parâmetros name
, image
e task_id
do Pod a usar. O /home/airflow/composer_kube_config
contém credenciais para autenticar no GKE.
Configuração adicional
Este exemplo mostra parâmetros adicionais que pode configurar no KubernetesPodOperator.
Consulte os seguintes recursos para mais informações:
Para ver informações sobre a utilização de segredos e mapas de configuração do Kubernetes, consulte o artigo Use segredos e mapas de configuração do Kubernetes.
Para obter informações sobre a utilização de modelos Jinja com o KubernetesPodOperator, consulte o artigo Utilize modelos Jinja.
Para obter informações sobre os parâmetros KubernetesPodOperator, consulte a referência do operador na documentação do Airflow.
Use modelos Jinja
O Airflow suporta modelos Jinja em DAGs.
Tem de declarar os parâmetros do Airflow necessários (task_id
, name
e image
) com o operador. Conforme mostrado no exemplo seguinte, pode usar modelos para todos os outros parâmetros com o Jinja, incluindo cmds
, arguments
, env_vars
e config_file
.
O parâmetro env_vars
no exemplo é definido a partir de uma
variável do Airflow denominada my_value
. O DAG de exemplo obtém o respetivo valor da variável de modelo vars
no Airflow. O Airflow tem mais variáveis que dão acesso a diferentes tipos de informações. Por exemplo, pode usar a variável de modelo conf
para aceder aos valores das opções de configuração do Airflow. Para mais informações e a lista de variáveis disponíveis no Airflow, consulte a referência de modelos na documentação do Airflow.
Sem alterar o DAG nem criar a variável env_vars
, a tarefa ex-kube-templates
no exemplo falha porque a variável não existe. Crie esta variável na IU do Airflow ou com a CLI Google Cloud:
IU do Airflow
Aceda à IU do Airflow.
Na barra de ferramentas, selecione Administração > Variáveis.
Na página Variável de lista, clique em Adicionar um novo registo.
Na página Adicionar variável, introduza as seguintes informações:
- Tecla:
my_value
- Val:
example_value
- Tecla:
Clique em Guardar.
gcloud
Introduza o seguinte comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Substituir:
ENVIRONMENT
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.
O exemplo seguinte demonstra como usar modelos Jinja com KubernetesPodOperator:
Use segredos e ConfigMaps do Kubernetes
Um segredo do Kubernetes é um objeto que contém dados confidenciais. Um ConfigMap do Kubernetes é um objeto que contém dados não confidenciais em pares de chave/valor.
No Cloud Composer 2, pode criar segredos e ConfigMaps através da CLI Google Cloud, da API ou do Terraform e, em seguida, aceder aos mesmos a partir do KubernetesPodOperator.
Acerca dos ficheiros de configuração YAML
Quando cria um segredo do Kubernetes ou um ConfigMap através da Google Cloud CLI e da API, fornece um ficheiro no formato YAML. Este ficheiro tem de seguir o mesmo formato usado pelos segredos e pelos mapas de configuração do Kubernetes. A documentação do Kubernetes fornece muitos exemplos de código de ConfigMaps e Secrets. Para começar, pode consultar a página Distribua credenciais de forma segura através de segredos e ConfigMaps.
Tal como nos segredos do Kubernetes, use a representação base64 quando definir valores nos segredos.
Para codificar um valor, pode usar o seguinte comando (esta é uma das muitas formas de obter um valor codificado em base64):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
Saída:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Os dois exemplos de ficheiros YAML seguintes são usados em exemplos mais adiante neste guia. Exemplo de ficheiro de configuração YAML para um secret do Kubernetes:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Outro exemplo que demonstra como incluir ficheiros. Tal como no exemplo anterior, primeiro codifique o conteúdo de um ficheiro (cat ./key.json | base64
) e, em seguida, faculte este valor no ficheiro YAML:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
Um exemplo de um ficheiro de configuração YAML para um ConfigMap. Não precisa de usar a representação base64 em ConfigMaps:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Faça a gestão dos segredos do Kubernetes
No Cloud Composer 2, cria segredos através da CLI do Google Cloud e kubectl
:
Obtenha informações sobre o cluster do seu ambiente:
Execute o seguinte comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Substituir:
ENVIRONMENT
com o nome do seu ambiente.LOCATION
com a região onde o ambiente do Cloud Composer está localizado.
O resultado deste comando usa o seguinte formato:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Para obter o ID do cluster do GKE, copie o resultado após
/clusters/
(termina em-gke
).
Estabeleça ligação ao seu cluster do GKE com o seguinte comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Substitua o seguinte:
CLUSTER_ID
: o ID do cluster do ambiente.PROJECT_ID
: o ID do projeto.LOCATION
: a região onde o ambiente está localizado.
Crie segredos do Kubernetes:
Os comandos seguintes demonstram duas abordagens diferentes para criar segredos do Kubernetes. A abordagem
--from-literal
usa pares de chave-valor. A abordagem--from-file
usa o conteúdo dos ficheiros.Para criar um Kubernetes Secret fornecendo pares de chave-valor, execute o seguinte comando. Este exemplo cria um segredo denominado
airflow-secrets
que tem um camposql_alchemy_conn
com o valor detest_value
.kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Para criar um segredo do Kubernetes fornecendo o conteúdo do ficheiro, execute o comando seguinte. Este exemplo cria um segredo denominado
service-account
que tem o camposervice-account.json
com o valor retirado do conteúdo de um ficheiro./key.json
local.kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Use segredos do Kubernetes nos seus DAGs
Este exemplo mostra duas formas de usar segredos do Kubernetes: como uma variável de ambiente e como um volume montado pelo pod.
O primeiro secret, airflow-secrets
, está definido como uma variável de ambiente do Kubernetes denominada SQL_CONN
(em oposição a uma variável de ambiente do Airflow ou do Cloud Composer).
O segundo segredo, service-account
, monta service-account.json
, um ficheiro com um token de conta de serviço, em /var/secrets/google
.
Veja o aspeto dos objetos secretos:
O nome do primeiro segredo do Kubernetes é definido na variável secret_env
.
Este Secret tem o nome airflow-secrets
. O parâmetro deploy_type
especifica que tem de ser exposto como uma variável de ambiente. O nome da variável de ambiente é SQL_CONN
, conforme especificado no parâmetro deploy_target
. Por último, o valor da variável de ambiente SQL_CONN
é definido para o valor da chave sql_alchemy_conn
.
O nome do segundo secret do Kubernetes é definido na variável secret_volume
. Este Secret tem o nome service-account
. É exposto como um volume, conforme especificado no parâmetro deploy_type
. O caminho do ficheiro a
montar, deploy_target
, é /var/secrets/google
. Por fim, o key
do segredo armazenado no deploy_target
é service-account.json
.
Veja o aspeto da configuração do operador:
Informações sobre o fornecedor do Kubernetes da CNCF
O KubernetesPodOperator é implementado no fornecedor apache-airflow-providers-cncf-kubernetes
.
Para ver notas de lançamento detalhadas do fornecedor do CNCF Kubernetes, consulte o Website do fornecedor do CNCF Kubernetes.
Versão 6.0.0
Na versão 6.0.0 do pacote do fornecedor Kubernetes da CNCF, a ligação kubernetes_default
é usada por predefinição no KubernetesPodOperator.
Se especificou uma ligação personalizada na versão 5.0.0, esta ligação personalizada continua a ser usada pelo operador. Para voltar a usar a ligação kubernetes_default
, é recomendável ajustar os DAGs em conformidade.
Versão 5.0.0
Esta versão introduz algumas alterações incompatíveis com versões anteriores
em comparação com a versão 4.4.0. As mais importantes estão relacionadas com a ligação kubernetes_default
, que não é usada na versão 5.0.0.
- A associação
kubernetes_default
tem de ser modificada. O caminho de configuração do Kubernetes tem de estar definido como/home/airflow/composer_kube_config
(conforme mostrado na figura seguinte). Em alternativa, tem de adicionarconfig_file
à configuração do KubernetesPodOperator (conforme mostrado no exemplo de código seguinte).

- Modifique o código de uma tarefa com o KubernetesPodOperator da seguinte forma:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para mais informações acerca da versão 5.0.0, consulte as notas de lançamento do fornecedor do CNCF Kubernetes.
Resolução de problemas
Esta secção oferece conselhos para resolver problemas comuns do KubernetesPodOperator:
Ver registos
Ao resolver problemas, pode verificar os registos pela seguinte ordem:
Registos de tarefas do Airflow:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador DAGs.
Clique no nome do DAG e, de seguida, clique na execução do DAG para ver os detalhes e os registos.
Registos do programador do Airflow:
Aceda à página Detalhes do ambiente.
Aceda ao separador Registos.
Inspecione os registos do programador do Airflow.
Registos de pods na Google Cloud consola, em cargas de trabalho do GKE. Estes registos incluem o ficheiro YAML de definição do pod, os eventos do pod e os detalhes do pod.
Códigos de retorno diferentes de zero
Quando usar o KubernetesPodOperator (e o GKEStartPodOperator), o código de retorno do ponto de entrada do contentor determina se a tarefa é considerada bem-sucedida ou não. Os códigos de retorno diferentes de zero indicam uma falha.
Um padrão comum é executar um script de shell como o ponto de entrada do contentor para agrupar várias operações no contentor.
Se estiver a escrever um script deste tipo, recomendamos que inclua o comando set -e
na parte superior do script para que os comandos com falhas no script terminem o script e propaguem a falha para a instância da tarefa do Airflow.
Limites de tempo de agrupamentos
O limite de tempo predefinido para KubernetesPodOperator é de 120 segundos, o que pode resultar em limites de tempo que ocorrem antes da transferência de imagens maiores. Pode aumentar o tempo limite alterando o parâmetro startup_timeout_seconds
quando cria o KubernetesPodOperator.
Quando um pod excede o limite de tempo, o registo específico da tarefa está disponível na IU do Airflow. Por exemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Os tempos limite dos pods também podem ocorrer quando a conta de serviço do Cloud Composer não tem as autorizações do IAM necessárias para realizar a tarefa em questão. Para verificar isto, consulte os erros ao nível do pod através dos painéis de controlo do GKE para ver os registos da sua carga de trabalho específica ou use o Cloud Logging.
Falha ao estabelecer uma nova ligação
A atualização automática está ativada por predefinição nos clusters do GKE. Se um node pool estiver num cluster que está a ser atualizado, pode ver o seguinte erro:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para verificar se o cluster está a ser atualizado, na Google Cloud consola, aceda à página Clusters Kubernetes e procure o ícone de carregamento junto ao nome do cluster do seu ambiente.