Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página orienta você a criar uma arquitetura de push baseada em eventos acionando DAGs do Cloud Composer em resposta a mudanças de tópico do Pub/Sub. Os exemplos neste tutorial demonstram o processamento do ciclo completo de gerenciamento do Pub/Sub, incluindo o gerenciamento de assinaturas, como parte do processo DAG. É adequado para alguns dos casos de uso comuns quando você precisa acionar DAGs, mas não quer configurar permissões de acesso extras.
Por exemplo, as mensagens enviadas pelo Pub/Sub podem ser usadas como uma solução se você não quiser fornecer acesso direto a um ambiente do Cloud Composer por motivos de segurança. É possível configurar uma função do Cloud Run que cria mensagens do Pub/Sub e as publica em um tópico do Pub/Sub. Em seguida, crie um DAG que extraia e processe as mensagens do Pub/Sub.
Neste exemplo específico, você cria uma função do Cloud Run e implanta duas DAGs. O primeiro DAG extrai mensagens do Pub/Sub e aciona o segundo DAG de acordo com o conteúdo da mensagem do Pub/Sub.
Neste tutorial, consideramos que você já conhece o Python e o console do Google Cloud.
Objetivos
Custos
Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:
- Cloud Composer (consulte também os custos adicionais).
- Pub/Sub
- Cloud Run functions
Ao concluir este tutorial, exclua os recursos criados para evitar a continuidade no faturamento. Consulte Limpeza para mais detalhes.
Antes de começar
Para este tutorial, você precisa de um Google Cloud projeto. Configure o projeto da seguinte maneira:
No console do Google Cloud, selecione ou crie um projeto:
Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.
Verifique se o Google Cloud usuário do projeto tem os seguintes papéis para criar os recursos necessários:
- Usuário da conta de serviço (
roles/iam.serviceAccountUser
) - Editor do Pub/Sub (
roles/pubsub.editor
) - Administrador de objetos do armazenamento e do ambiente
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador de funções do Cloud Run (
roles/cloudfunctions.admin
) - Visualizador de registros (
roles/logging.viewer
)
- Usuário da conta de serviço (
Verifique se a conta de serviço que executa a função do Cloud Run tem permissões suficientes no projeto para acessar o Pub/Sub. Por padrão, as funções do Cloud Run usam a conta de serviço padrão do App Engine. Essa conta de serviço tem o papel de editor, que tem permissões suficientes para este tutorial.
Ativar as APIs do projeto
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Ative a API Cloud Composer no seu projeto adicionando as seguintes definições de recursos ao script do Terraform:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Substitua <PROJECT_ID>
pelo ID do projeto
do seu projeto. Por exemplo, example-project
.
Criar seu ambiente do Cloud Composer
Crie um ambiente do Cloud Composer 2.
Como parte deste procedimento,
você concede o papel Extensão do agente de serviço da API Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) à conta do agente de serviço do Composer. O Cloud Composer usa essa conta para realizar operações
no projeto Google Cloud .
Criar um tópico do Pub/Sub
Este exemplo aciona um DAG em resposta a uma mensagem enviada para um tópico do Pub/Sub. Crie um tópico do Pub/Sub para usar neste exemplo:
Console
No console do Google Cloud, acesse a página Tópicos do Pub/Sub.
Clique em Criar tópico.
No campo ID do tópico, insira
dag-topic-trigger
como um ID para o tópico.Deixe as outras opções com os valores padrão.
Clique em Criar tópico.
gcloud
Para criar um tópico, execute o comando gcloud pubsub topics create na CLI do Google Cloud:
gcloud pubsub topics create dag-topic-trigger
Terraform
Adicione as seguintes definições de recursos ao seu script do Terraform:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Substitua <PROJECT_ID>
pelo ID do projeto
do seu projeto. Por exemplo, example-project
.
Fazer upload dos seus DAGs
Faça upload de DAGs para o ambiente:
- Salve o seguinte arquivo DAG no seu computador local.
- Substitua
<PROJECT_ID>
pelo ID do projeto do seu projeto. Por exemplo,example-project
. - Faça o upload do arquivo DAG editado para o ambiente.
O código de exemplo contém dois DAGs: trigger_dag
e target_dag
.
O DAG trigger_dag
se inscreve em um tópico do Pub/Sub, extrai
mensagens do Pub/Sub e aciona outro DAG especificado no ID do DAG
dos dados da mensagem do Pub/Sub. Neste exemplo, trigger_dag
aciona
o DAG target_dag
, que gera mensagens para os registros de tarefas.
O DAG trigger_dag
contém as seguintes tarefas:
subscribe_task
: inscreva-se em um tópico do Pub/Sub.pull_messages_operator
: leia os dados de uma mensagem do Pub/Sub comPubSubPullOperator
.trigger_target_dag
: aciona outro DAG (neste exemplo,target_dag
) de acordo com os dados nas mensagens extraídas do tópico do Pub/Sub.
O DAG target_dag
contém apenas uma tarefa: output_to_logs
. Essa tarefa
imprime mensagens no registro de tarefas com um segundo de atraso.
Implantar uma função do Cloud Run que publica mensagens em um tópico do Pub/Sub
Nesta seção, você implanta uma função do Cloud Run que publica mensagens em um tópico do Pub/Sub.
Criar uma função do Cloud Run e especificar a configuração
Console
No console do Google Cloud, acesse a página das funções do Cloud Run.
Clique em Criar função.
No campo Ambiente, selecione 1ª geração.
No campo Nome da função, insira o nome da função:
pubsub-publisher
.No campo Tipo de gatilho, selecione HTTP.
Na seção Autenticação, selecione Permitir invocações não autenticadas. Essa opção permite que usuários não autenticados invoquem uma função HTTP.
Clique em "Salvar".
Clique em Próximo para acessar a etapa Código.
Terraform
Considere usar o console do Google Cloud para essa etapa, porque não há uma maneira simples de gerenciar o código-fonte da função no Terraform.
Este exemplo demonstra como fazer upload de uma função do Cloud Run de um arquivo de arquivamento ZIP local criando um bucket do Cloud Storage, armazenando o arquivo nesse bucket e usando o arquivo do bucket como uma fonte para a função do Cloud Run. Se você usar essa abordagem, o Terraform não vai atualizar automaticamente o código-fonte da função, mesmo que você crie um novo arquivo. Para fazer o upload do código da função novamente, mude o nome do arquivo.
- Faça o download dos arquivos
pubsub_publisher.py
erequirements.txt
. - No arquivo
pubsub_publisher.py
, substitua<PROJECT_ID>
pelo ID do projeto. Por exemplo,example-project
. - Crie um arquivo ZIP chamado
pubsub_function.zip
com opbusub_publisner.py
e o arquivorequirements.txt
. - Salve o arquivo ZIP em um diretório onde o script do Terraform esteja armazenado.
- Adicione as seguintes definições de recursos ao script do Terraform e substitua
<PROJECT_ID>
pelo ID do projeto.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Especificar parâmetros de código da função do Cloud Run
Console
Na etapa Code, no campo Runtime, selecione o ambiente de execução da linguagem que a função usa. Neste exemplo, selecione Python 3.10.
No campo Ponto de entrada, digite
pubsub_publisher
. Esse é o código que é executado quando a função do Cloud Run é executada. O valor dessa flag precisa ser um nome de função ou de classe totalmente qualificado que exista no código-fonte.
Terraform
Pule esta etapa. Os parâmetros da função do Cloud Run já estão definidos no recurso google_cloudfunctions_function
.
Fazer upload do código da função do Cloud Run
Console
No campo Código-fonte, selecione a opção adequada para saber como fornecer o código-fonte da função. Neste tutorial, adicione o código da função usando o editor inline das funções do Cloud Run. Como alternativa, você pode fazer upload de um arquivo ZIP ou usar o Cloud Source Repositories.
- Coloque o exemplo de código abaixo no arquivo main.py.
- Substitua
<PROJECT_ID>
pelo ID do projeto do seu projeto. Por exemplo,example-project
.
Terraform
Pule esta etapa. Os parâmetros da função do Cloud Run já estão definidos no recurso google_cloudfunctions_function
.
Especificar as dependências da função do Cloud Run
Console
Especifique as dependências da função no arquivo de metadados requirements.txt:
Quando você implanta sua função, as funções do Cloud Run fazem o download e instalam
as dependências declaradas no arquivo requirements.txt, uma linha por pacote.
Esse arquivo precisa estar no mesmo diretório do arquivo main.py que contém
o código da função. Para mais detalhes, consulte
Arquivos de requisitos
na documentação do pip
.
Terraform
Pule esta etapa. As dependências de função do Cloud Run são definidas no
arquivo requirements.txt
no arquivo pubsub_function.zip
.
Implantar a função do Cloud Run
Console
Clique em Implantar. Quando a implantação for concluída, a função vai aparecer com uma marca de seleção verde na página Funções do Cloud Run no console do Google Cloud.
Verifique se a conta de serviço que executa a função do Cloud Run tem permissões suficientes no seu projeto para acessar o Pub/Sub.
Terraform
Inicialize o Terraform:
terraform init
Revise a configuração e verifique se os recursos que o Terraform vai criar ou atualizar correspondem às suas expectativas:
terraform plan
Para verificar se a configuração é válida, execute o seguinte comando:
terraform validate
Para aplicar a configuração do Terraform, execute o comando a seguir e digite "yes" no prompt:
terraform apply
Aguarde até que o Terraform exiba a mensagem "Apply complete!".
No console do Google Cloud, navegue até seus recursos na IU para verificar se foram criados ou atualizados pelo Terraform.
Testar a função do Cloud Run
Para verificar se a função publica uma mensagem em um tópico do Pub/Sub e se os DAGs de exemplo funcionam conforme o esperado:
Verifique se os DAGs estão ativos:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia DAGs.
Verifique os valores na coluna State para DAGs com os nomes
trigger_dag
etarget_dag
. Os dois DAGs precisam estar no estadoActive
.
Envie uma mensagem de teste do Pub/Sub. É possível fazer isso no Cloud Shell:
No console do Google Cloud, acesse a página Funções.
Clique no nome da função,
pubsub-publisher
.Acesse a guia Testes.
Na seção Configurar evento acionador, insira o seguinte valor-chave JSON:
{"message": "target_dag"}
. Não modifique o par de chave-valor, porque essa mensagem aciona o DAG de teste mais tarde.Na seção Comando de teste, clique em Testar no Cloud Shell.
No terminal do Cloud Shell, aguarde até que um comando apareça automaticamente. Execute este comando pressionando
Enter
.Se a mensagem Autorizar o Cloud Shell aparecer, clique em Autorizar.
Confira se o conteúdo da mensagem corresponde à mensagem do Pub/Sub. Neste exemplo, a mensagem de saída precisa começar com
Message b'target_dag' with message_length 10 published to
como uma resposta da função.
Verifique se
target_dag
foi acionado:Aguarde pelo menos um minuto para que uma nova execução de DAG de
trigger_dag
seja concluída.No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia DAGs.
Clique em
trigger_dag
para acessar a página de detalhes do DAG. Na guia Execuções, uma lista de execuções de DAG para o DAGtrigger_dag
é mostrada.Esse DAG é executado a cada minuto e processa todas as mensagens do Pub/Sub enviadas pela função. Se nenhuma mensagem for enviada, a tarefa
trigger_target
será marcada comoSkipped
nos registros de execução do DAG. Se os DAGs forem acionados, a tarefatrigger_target
será marcada comoSuccess
.Procure várias execuções de DAG recentes para localizar uma em que todas as três tarefas (
subscribe_task
,pull_messages_operator
etrigger_target
) estejam nos statusSuccess
.Volte para a guia DAGs e verifique se a coluna Execuções bem-sucedidas do DAG
target_dag
lista uma execução bem-sucedida.
Resumo
Neste tutorial, você aprendeu a usar o Cloud Run functions para publicar mensagens em um tópico do Pub/Sub e implantar um DAG que se inscreve em um tópico do Pub/Sub, extrai mensagens do Pub/Sub e aciona outro DAG especificado no ID do DAG dos dados da mensagem.
Também há maneiras alternativas de criar e gerenciar assinaturas do Pub/Sub e acionar DAGs que estão fora do escopo deste tutorial. Por exemplo, é possível usar funções do Cloud Run para acionar DAGs do Airflow quando ocorre um evento especificado. Confira nossos tutoriais para testar os outros recursos doGoogle Cloud .
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados neste tutorial, exclua o projeto que contém os recursos ou mantenha o projeto e exclua os recursos individuais.
Excluir o projeto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Excluir recursos individuais
Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.
Console
- Exclua o ambiente do Cloud Composer. Você também exclui o bucket do ambiente durante esse procedimento.
- Exclua o tópico do Pub/Sub,
dag-topic-trigger
. Exclua a função do Cloud Run.
No console do Google Cloud, acesse as funções do Cloud Run.
Clique na caixa de seleção da função que você quer excluir,
pubsub-publisher
.Clique em Excluir e siga as instruções.
Terraform
- Verifique se o script do Terraform não contém entradas para recursos que ainda são necessários para o projeto. Por exemplo, talvez você queira manter algumas APIs ativadas e as permissões do IAM ainda atribuídas (se você adicionou essas definições ao seu script do Terraform).
- Execute
terraform destroy
. - Exclua manualmente o bucket do ambiente. O Cloud Composer não exclui automaticamente. É possível fazer isso no console do Google Cloud ou na CLI do Google Cloud.
A seguir
- Como testar DAGs
- Testar funções HTTP
- Implantar uma função do Cloud Run
- Teste outros recursos do Google Cloud . Confira nossos tutoriais.