Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página explica como criar uma arquitetura push baseada em eventos acionando DAGs do Cloud Composer em resposta a alterações de tópicos do Pub/Sub. Os exemplos neste tutorial demonstram o processamento do ciclo completo da gestão do Pub/Sub, incluindo a gestão de subscrições, como parte do processo DAG. É adequado para alguns dos exemplos de utilização comuns quando precisa de acionar DAGs, mas não quer configurar autorizações de acesso adicionais.
Por exemplo, as mensagens enviadas através do Pub/Sub podem ser usadas como solução se não quiser conceder acesso direto a um ambiente do Cloud Composer por motivos de segurança. Pode configurar uma função do Cloud Run que cria mensagens do Pub/Sub e as publica num tópico do Pub/Sub. Em seguida, pode criar um DAG que extrai mensagens do Pub/Sub e, em seguida, processa estas mensagens.
Neste exemplo específico, cria uma função do Cloud Run e implementa dois DAGs. O primeiro DAG extrai mensagens do Pub/Sub e aciona o segundo DAG de acordo com o conteúdo da mensagem do Pub/Sub.
Este tutorial pressupõe que tem conhecimentos de Python e da Google Cloud consola.
Objetivos
Custos
Este tutorial usa os seguintes componentes faturáveis do Google Cloud:
- Cloud Composer (consulte também os custos adicionais)
- Pub/Sub
- Funções do Cloud Run
Depois de concluir este tutorial, pode evitar a faturação contínua eliminando os recursos que criou. Consulte Limpar para ver mais detalhes.
Antes de começar
Para este tutorial, precisa de um Google Cloud projeto. Configure o projeto da seguinte forma:
Na Google Cloud consola, selecione ou crie um projeto:
Certifique-se de que a faturação está ativada para o seu projeto. Saiba como verificar se a faturação está ativada num projeto.
Certifique-se de que o utilizador do Google Cloud projeto tem as seguintes funções para criar os recursos necessários:
- Utilizador da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Pub/Sub (
roles/pubsub.editor
) - Administrador de objetos de ambiente e armazenamento
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador das funções do Cloud Run (
roles/cloudfunctions.admin
) - Visualizador de registos (
roles/logging.viewer
)
- Utilizador da conta de serviço (
Certifique-se de que a conta de serviço que executa a sua função do Cloud Run tem autorizações suficientes no seu projeto para aceder ao Pub/Sub. Por predefinição, as funções do Cloud Run usam a conta de serviço predefinida do App Engine. Esta conta de serviço tem a função de Editor, que tem autorizações suficientes para este tutorial.
Ative APIs para o seu projeto
Consola
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Ative a API Cloud Composer no seu projeto adicionando as seguintes definições de recursos ao seu script do Terraform:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Substitua <PROJECT_ID>
pelo ID do projeto
do seu projeto. Por exemplo, example-project
.
Crie o seu ambiente do Cloud Composer
Crie um ambiente do Cloud Composer 2.
Como parte deste procedimento,
concede a função Extensão do agente de serviço da API Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) à conta do agente de serviço do Composer. O Cloud Composer usa esta conta para realizar operações no seu Google Cloud projeto.
Crie um tópico do Pub/Sub
Este exemplo aciona um DAG em resposta a uma mensagem enviada para um tópico do Pub/Sub. Crie um tópico Pub/Sub para usar neste exemplo:
Consola
Na Google Cloud consola, aceda à página Tópicos do Pub/Sub.
Clique em Criar tópico.
No campo ID do tópico, introduza
dag-topic-trigger
como ID do seu tópico.Deixe as outras opções nas predefinições.
Clique em Criar tópico.
gcloud
Para criar um tópico, execute o comando gcloud pubsub topics create na CLI gcloud:
gcloud pubsub topics create dag-topic-trigger
Terraform
Adicione as seguintes definições de recursos ao seu script do Terraform:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Substitua <PROJECT_ID>
pelo ID do projeto
do seu projeto. Por exemplo, example-project
.
Carregue os seus DAGs
Carregue DAGs para o seu ambiente:
- Guarde o seguinte ficheiro DAG no seu computador local.
- Substitua
<PROJECT_ID>
pelo ID do projeto do seu projeto. Por exemplo,example-project
. - Carregue o ficheiro DAG editado para o seu ambiente.
O código de exemplo contém dois DAGs: trigger_dag
e target_dag
.
O trigger_dag
DAG subscreve um tópico do Pub/Sub, extrai mensagens do Pub/Sub e aciona outro DAG especificado no ID do DAG dos dados da mensagem do Pub/Sub. Neste exemplo, trigger_dag
aciona
o DAG target_dag
, que envia mensagens para os registos de tarefas.
O DAG trigger_dag
contém as seguintes tarefas:
subscribe_task
: subscreva um tópico do Pub/Sub.pull_messages_operator
: leia os dados de uma mensagem do Pub/Sub comPubSubPullOperator
.trigger_target_dag
: acionar outro DAG (neste exemplo,target_dag
) de acordo com os dados nas mensagens extraídas do tópico do Pub/Sub.
O DAG contém apenas uma tarefa: output_to_logs
.target_dag
Esta tarefa
imprime mensagens no registo de tarefas com um atraso de um segundo.
Implemente uma função do Cloud Run que publique mensagens num tópico do Pub/Sub
Nesta secção, implementa uma função do Cloud Run que publica mensagens num tópico do Pub/Sub.
Crie uma função do Cloud Run e especifique a respetiva configuração
Consola
Na Google Cloud consola, aceda à página Funções do Cloud Run.
Clique em Criar função.
No campo Ambiente, selecione 1.ª geração.
No campo Nome da função, introduza o nome da função:
pubsub-publisher
.No campo Tipo de acionador, selecione HTTP.
Na secção Autenticação, selecione Permitir invocações não autenticadas. Esta opção concede aos utilizadores não autenticados a capacidade de invocar uma função HTTP.
Clique em Guardar.
Clique em Seguinte para avançar para o passo Código.
Terraform
Considere usar a Google Cloud consola para este passo, porque não existe uma forma simples de gerir o código fonte da função a partir do Terraform.
Este exemplo demonstra como pode carregar uma função do Cloud Run a partir de um ficheiro de arquivo ZIP local criando um contentor do Cloud Storage, armazenando o ficheiro neste contentor e, em seguida, usando o ficheiro do contentor como origem para a função do Cloud Run. Se usar esta abordagem, o Terraform não atualiza automaticamente o código fonte da sua função, mesmo que crie um novo ficheiro de arquivo. Para voltar a carregar o código da função, pode alterar o nome do ficheiro do arquivo.
- Transfira os ficheiros
pubsub_publisher.py
erequirements.txt
. - No ficheiro
pubsub_publisher.py
, substitua<PROJECT_ID>
pelo ID do projeto do seu projeto. Por exemplo,example-project
. - Crie um arquivo ZIP com o nome
pubsub_function.zip
com o ficheiropbusub_publisner.py
e o ficheirorequirements.txt
. - Guarde o arquivo ZIP num diretório onde o script do Terraform está armazenado.
- Adicione as seguintes definições de recursos ao seu script do Terraform e substitua
<PROJECT_ID>
pelo ID do projeto.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Especifique os parâmetros do código da função do Cloud Run
Consola
No passo Código, no campo Tempo de execução, selecione o tempo de execução da linguagem que a sua função usa. Neste exemplo, selecione Python 3.10.
No campo Ponto de entrada, introduza
pubsub_publisher
. Este é o código que é executado quando a sua função do Cloud Run é executada. O valor desta flag tem de ser um nome de função ou um nome de classe totalmente qualificado que exista no seu código fonte.
Terraform
Ignorar este passo. Os parâmetros da função do Cloud Run já estão definidos no recurso google_cloudfunctions_function
.
Carregue o código da função do Cloud Run
Consola
No campo Código fonte, selecione a opção adequada para indicar como fornece o código fonte da função. Neste tutorial, adicione o código da função através do editor inline das funções do Cloud Run. Em alternativa, pode carregar um ficheiro ZIP ou usar os Cloud Source Repositories.
- Coloque o seguinte exemplo de código no ficheiro main.py.
- Substitua
<PROJECT_ID>
pelo ID do projeto do seu projeto. Por exemplo,example-project
.
Terraform
Ignorar este passo. Os parâmetros da função do Cloud Run já estão definidos no recurso google_cloudfunctions_function
.
Especifique as dependências da função do Cloud Run
Consola
Especifique as dependências das funções no ficheiro de metadados requirements.txt:
Quando implementa a sua função, as funções do Cloud Run transferem e instalam as dependências declaradas no ficheiro requirements.txt, uma linha por pacote.
Este ficheiro tem de estar no mesmo diretório que o ficheiro main.py que contém o código da sua função. Para ver mais detalhes, consulte os
ficheiros de requisitos
na pip
documentação.
Terraform
Ignorar este passo. As dependências da função do Cloud Run são definidas no ficheiro requirements.txt
no arquivo pubsub_function.zip
.
Implemente a sua função do Cloud Run
Consola
Clique em Implementar. Quando a implementação termina com êxito, a função aparece com uma marca de verificação verde na página Funções do Cloud Run na Google Cloud consola.
Certifique-se de que a conta de serviço que executa a sua função do Cloud Run tem autorizações suficientes no seu projeto para aceder ao Pub/Sub.
Terraform
Inicialize o Terraform:
terraform init
Reveja a configuração e verifique se os recursos que o Terraform vai criar ou atualizar correspondem às suas expetativas:
terraform plan
Para verificar se a sua configuração é válida, execute o seguinte comando:
terraform validate
Aplique a configuração do Terraform executando o seguinte comando e introduzindo yes no comando:
terraform apply
Aguarde até que o Terraform apresente a mensagem "Apply complete!" (Aplicação concluída!).
Na Google Cloud consola, navegue para os seus recursos na IU para se certificar de que o Terraform os criou ou atualizou.
Teste a sua função do Cloud Run
Para verificar se a sua função publica uma mensagem num tópico Pub/Sub e se os DAGs de exemplo funcionam conforme previsto:
Verifique se os DAGs estão ativos:
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador DAGs.
Verifique os valores na coluna Estado para DAGs com os nomes
trigger_dag
etarget_dag
. Ambos os DAGs têm de estar no estadoActive
.
Envie uma mensagem Pub/Sub de teste. Pode fazê-lo no Cloud Shell:
Na Google Cloud consola, aceda à página Funções.
Clique no nome da sua função,
pubsub-publisher
.Aceda ao separador Testes.
Na secção Configurar evento de acionamento, introduza o seguinte par chave-valor JSON:
{"message": "target_dag"}
. Não modifique o par chave-valor, porque esta mensagem aciona o DAG de teste mais tarde.Na secção Comando de teste, clique em Testar no Cloud Shell.
No terminal do Cloud Shell, aguarde até que um comando seja apresentado automaticamente. Prima
Enter
para executar este comando.Se for apresentada a mensagem Autorizar Cloud Shell, clique em Autorizar.
Verifique se o conteúdo da mensagem corresponde à mensagem do Pub/Sub. Neste exemplo, a mensagem de saída tem de começar com
Message b'target_dag' with message_length 10 published to
como resposta da sua função.
Verifique se o
target_dag
foi acionado:Aguarde, pelo menos, um minuto para que uma nova execução do DAG de
trigger_dag
seja concluída.Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Aceda ao separador DAGs.
Clique em
trigger_dag
para aceder à página Detalhes do DAG. No separador Execuções, é apresentada uma lista de execuções de DAG para o DAGtrigger_dag
.Este DAG é executado a cada minuto e processa todas as mensagens do Pub/Sub enviadas a partir da função. Se não forem enviadas mensagens, a tarefa
trigger_target
é marcada comoSkipped
nos registos de execução do DAG. Se os DAGs foram acionados, a tarefatrigger_target
é marcada comoSuccess
.Examine várias execuções recentes do DAG para localizar uma execução do DAG em que todas as três tarefas (
subscribe_task
,pull_messages_operator
etrigger_target
) estão nos estadosSuccess
.Volte ao separador DAGs e verifique se a coluna Execuções bem-sucedidas do DAG
target_dag
apresenta uma execução bem-sucedida.
Resumo
Neste tutorial, aprendeu a usar as funções do Cloud Run para publicar mensagens num tópico do Pub/Sub e implementar um DAG que subscreve um tópico do Pub/Sub, extrai mensagens do Pub/Sub e aciona outro DAG especificado no ID do DAG dos dados da mensagem.
Também existem formas alternativas de criar e gerir subscrições do Pub/Sub e acionar DAGs que estão fora do âmbito deste tutorial. Por exemplo, pode usar funções do Cloud Run para acionar DAGs do Airflow quando ocorre um evento especificado. Consulte os nossos tutoriais para experimentar as outras Google Cloud funcionalidades.
Limpar
Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.
Elimine o projeto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Elimine recursos individuais
Se planeia explorar vários tutoriais e inícios rápidos, a reutilização de projetos pode ajudar a evitar exceder os limites de quota do projeto.
Consola
- Elimine o ambiente do Cloud Composer. Também elimina o contentor do ambiente durante este procedimento.
- Elimine o tópico do Pub/Sub,
dag-topic-trigger
. Elimine a função do Cloud Run.
Na Google Cloud consola, aceda às funções do Cloud Run.
Clique na caixa de verificação da função que quer eliminar,
pubsub-publisher
.Clique em Eliminar e, de seguida, siga as instruções.
Terraform
- Certifique-se de que o script do Terraform não contém entradas para recursos que o seu projeto ainda precisa. Por exemplo, pode querer manter algumas APIs ativadas e as autorizações do IAM ainda atribuídas (se adicionou essas definições ao seu script do Terraform).
- Corrida
terraform destroy
. - Elimine manualmente o contentor do ambiente. O Cloud Composer não a elimina automaticamente. Pode fazê-lo a partir da Google Cloud consola ou da Google Cloud CLI.
O que se segue?
- Testar DAGs
- Testar funções HTTP
- Implemente uma função do Cloud Run
- Experimente outras Google Cloud funcionalidades. Consulte os nossos tutoriais.