Acione DAGs com funções do Cloud e mensagens do Pub/Sub

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página explica como criar uma arquitetura push baseada em eventos acionando DAGs do Cloud Composer em resposta a alterações de tópicos do Pub/Sub. Os exemplos neste tutorial demonstram o processamento do ciclo completo da gestão do Pub/Sub, incluindo a gestão de subscrições, como parte do processo DAG. É adequado para alguns dos exemplos de utilização comuns quando precisa de acionar DAGs, mas não quer configurar autorizações de acesso adicionais.

Por exemplo, as mensagens enviadas através do Pub/Sub podem ser usadas como solução se não quiser conceder acesso direto a um ambiente do Cloud Composer por motivos de segurança. Pode configurar uma função do Cloud Run que cria mensagens do Pub/Sub e as publica num tópico do Pub/Sub. Em seguida, pode criar um DAG que extrai mensagens do Pub/Sub e, em seguida, processa estas mensagens.

Neste exemplo específico, cria uma função do Cloud Run e implementa dois DAGs. O primeiro DAG extrai mensagens do Pub/Sub e aciona o segundo DAG de acordo com o conteúdo da mensagem do Pub/Sub.

Este tutorial pressupõe que tem conhecimentos de Python e da Google Cloud consola.

Objetivos

Custos

Este tutorial usa os seguintes componentes faturáveis do Google Cloud:

Depois de concluir este tutorial, pode evitar a faturação contínua eliminando os recursos que criou. Consulte Limpar para ver mais detalhes.

Antes de começar

Para este tutorial, precisa de um Google Cloud projeto. Configure o projeto da seguinte forma:

  1. Na Google Cloud consola, selecione ou crie um projeto:

    Aceder ao seletor de projetos

  2. Certifique-se de que a faturação está ativada para o seu projeto. Saiba como verificar se a faturação está ativada num projeto.

  3. Certifique-se de que o utilizador do Google Cloud projeto tem as seguintes funções para criar os recursos necessários:

    • Utilizador da conta de serviço (roles/iam.serviceAccountUser)
    • Editor do Pub/Sub (roles/pubsub.editor)
    • Administrador de objetos de ambiente e armazenamento (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador das funções do Cloud Run (roles/cloudfunctions.admin)
    • Visualizador de registos (roles/logging.viewer)
  4. Certifique-se de que a conta de serviço que executa a sua função do Cloud Run tem autorizações suficientes no seu projeto para aceder ao Pub/Sub. Por predefinição, as funções do Cloud Run usam a conta de serviço predefinida do App Engine. Esta conta de serviço tem a função de Editor, que tem autorizações suficientes para este tutorial.

Ative APIs para o seu projeto

Consola

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Ative a API Cloud Composer no seu projeto adicionando as seguintes definições de recursos ao seu script do Terraform:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Substitua <PROJECT_ID> pelo ID do projeto do seu projeto. Por exemplo, example-project.

Crie o seu ambiente do Cloud Composer

Crie um ambiente do Cloud Composer 2.

Como parte deste procedimento, concede a função Extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) à conta do agente de serviço do Composer. O Cloud Composer usa esta conta para realizar operações no seu Google Cloud projeto.

Crie um tópico do Pub/Sub

Este exemplo aciona um DAG em resposta a uma mensagem enviada para um tópico do Pub/Sub. Crie um tópico Pub/Sub para usar neste exemplo:

Consola

  1. Na Google Cloud consola, aceda à página Tópicos do Pub/Sub.

    Aceder aos tópicos do Pub/Sub

  2. Clique em Criar tópico.

  3. No campo ID do tópico, introduza dag-topic-trigger como ID do seu tópico.

  4. Deixe as outras opções nas predefinições.

  5. Clique em Criar tópico.

gcloud

Para criar um tópico, execute o comando gcloud pubsub topics create na CLI gcloud:

gcloud pubsub topics create dag-topic-trigger

Terraform

Adicione as seguintes definições de recursos ao seu script do Terraform:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Substitua <PROJECT_ID> pelo ID do projeto do seu projeto. Por exemplo, example-project.

Carregue os seus DAGs

Carregue DAGs para o seu ambiente:

  1. Guarde o seguinte ficheiro DAG no seu computador local.
  2. Substitua <PROJECT_ID> pelo ID do projeto do seu projeto. Por exemplo, example-project.
  3. Carregue o ficheiro DAG editado para o seu ambiente.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

O código de exemplo contém dois DAGs: trigger_dag e target_dag.

O trigger_dag DAG subscreve um tópico do Pub/Sub, extrai mensagens do Pub/Sub e aciona outro DAG especificado no ID do DAG dos dados da mensagem do Pub/Sub. Neste exemplo, trigger_dag aciona o DAG target_dag, que envia mensagens para os registos de tarefas.

O DAG trigger_dag contém as seguintes tarefas:

  • subscribe_task: subscreva um tópico do Pub/Sub.
  • pull_messages_operator: leia os dados de uma mensagem do Pub/Sub com PubSubPullOperator.
  • trigger_target_dag: acionar outro DAG (neste exemplo, target_dag) de acordo com os dados nas mensagens extraídas do tópico do Pub/Sub.

O DAG contém apenas uma tarefa: output_to_logs.target_dag Esta tarefa imprime mensagens no registo de tarefas com um atraso de um segundo.

Implemente uma função do Cloud Run que publique mensagens num tópico do Pub/Sub

Nesta secção, implementa uma função do Cloud Run que publica mensagens num tópico do Pub/Sub.

Crie uma função do Cloud Run e especifique a respetiva configuração

Consola

  1. Na Google Cloud consola, aceda à página Funções do Cloud Run.

    Aceder às funções do Cloud Run

  2. Clique em Criar função.

  3. No campo Ambiente, selecione 1.ª geração.

  4. No campo Nome da função, introduza o nome da função: pubsub-publisher.

  5. No campo Tipo de acionador, selecione HTTP.

  6. Na secção Autenticação, selecione Permitir invocações não autenticadas. Esta opção concede aos utilizadores não autenticados a capacidade de invocar uma função HTTP.

  7. Clique em Guardar.

  8. Clique em Seguinte para avançar para o passo Código.

Terraform

Considere usar a Google Cloud consola para este passo, porque não existe uma forma simples de gerir o código fonte da função a partir do Terraform.

Este exemplo demonstra como pode carregar uma função do Cloud Run a partir de um ficheiro de arquivo ZIP local criando um contentor do Cloud Storage, armazenando o ficheiro neste contentor e, em seguida, usando o ficheiro do contentor como origem para a função do Cloud Run. Se usar esta abordagem, o Terraform não atualiza automaticamente o código fonte da sua função, mesmo que crie um novo ficheiro de arquivo. Para voltar a carregar o código da função, pode alterar o nome do ficheiro do arquivo.

  1. Transfira os ficheiros pubsub_publisher.py e requirements.txt.
  2. No ficheiro pubsub_publisher.py, substitua <PROJECT_ID> pelo ID do projeto do seu projeto. Por exemplo, example-project.
  3. Crie um arquivo ZIP com o nome pubsub_function.zip com o ficheiro pbusub_publisner.py e o ficheiro requirements.txt.
  4. Guarde o arquivo ZIP num diretório onde o script do Terraform está armazenado.
  5. Adicione as seguintes definições de recursos ao seu script do Terraform e substitua <PROJECT_ID> pelo ID do projeto.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Especifique os parâmetros do código da função do Cloud Run

Consola

  1. No passo Código, no campo Tempo de execução, selecione o tempo de execução da linguagem que a sua função usa. Neste exemplo, selecione Python 3.10.

  2. No campo Ponto de entrada, introduza pubsub_publisher. Este é o código que é executado quando a sua função do Cloud Run é executada. O valor desta flag tem de ser um nome de função ou um nome de classe totalmente qualificado que exista no seu código fonte.

Terraform

Ignorar este passo. Os parâmetros da função do Cloud Run já estão definidos no recurso google_cloudfunctions_function.

Carregue o código da função do Cloud Run

Consola

No campo Código fonte, selecione a opção adequada para indicar como fornece o código fonte da função. Neste tutorial, adicione o código da função através do editor inline das funções do Cloud Run. Em alternativa, pode carregar um ficheiro ZIP ou usar os Cloud Source Repositories.

  1. Coloque o seguinte exemplo de código no ficheiro main.py.
  2. Substitua <PROJECT_ID> pelo ID do projeto do seu projeto. Por exemplo, example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Ignorar este passo. Os parâmetros da função do Cloud Run já estão definidos no recurso google_cloudfunctions_function.

Especifique as dependências da função do Cloud Run

Consola

Especifique as dependências das funções no ficheiro de metadados requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.38.0
google-cloud-pubsub==2.28.0

Quando implementa a sua função, as funções do Cloud Run transferem e instalam as dependências declaradas no ficheiro requirements.txt, uma linha por pacote. Este ficheiro tem de estar no mesmo diretório que o ficheiro main.py que contém o código da sua função. Para ver mais detalhes, consulte os ficheiros de requisitos na pip documentação.

Terraform

Ignorar este passo. As dependências da função do Cloud Run são definidas no ficheiro requirements.txt no arquivo pubsub_function.zip.

Implemente a sua função do Cloud Run

Consola

Clique em Implementar. Quando a implementação termina com êxito, a função aparece com uma marca de verificação verde na página Funções do Cloud Run na Google Cloud consola.

Certifique-se de que a conta de serviço que executa a sua função do Cloud Run tem autorizações suficientes no seu projeto para aceder ao Pub/Sub.

Terraform

  1. Inicialize o Terraform:

    terraform init
    
  2. Reveja a configuração e verifique se os recursos que o Terraform vai criar ou atualizar correspondem às suas expetativas:

    terraform plan
    
  3. Para verificar se a sua configuração é válida, execute o seguinte comando:

    terraform validate
    
  4. Aplique a configuração do Terraform executando o seguinte comando e introduzindo yes no comando:

    terraform apply
    

Aguarde até que o Terraform apresente a mensagem "Apply complete!" (Aplicação concluída!).

Na Google Cloud consola, navegue para os seus recursos na IU para se certificar de que o Terraform os criou ou atualizou.

Teste a sua função do Cloud Run

Para verificar se a sua função publica uma mensagem num tópico Pub/Sub e se os DAGs de exemplo funcionam conforme previsto:

  1. Verifique se os DAGs estão ativos:

    1. Na Google Cloud consola, aceda à página Ambientes.

      Aceder a Ambientes

    2. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

    3. Aceda ao separador DAGs.

    4. Verifique os valores na coluna Estado para DAGs com os nomes trigger_dag e target_dag. Ambos os DAGs têm de estar no estado Active.

  2. Envie uma mensagem Pub/Sub de teste. Pode fazê-lo no Cloud Shell:

    1. Na Google Cloud consola, aceda à página Funções.

      Aceder às funções do Cloud Run

    2. Clique no nome da sua função, pubsub-publisher.

    3. Aceda ao separador Testes.

    4. Na secção Configurar evento de acionamento, introduza o seguinte par chave-valor JSON: {"message": "target_dag"}. Não modifique o par chave-valor, porque esta mensagem aciona o DAG de teste mais tarde.

    5. Na secção Comando de teste, clique em Testar no Cloud Shell.

    6. No terminal do Cloud Shell, aguarde até que um comando seja apresentado automaticamente. Prima Enter para executar este comando.

    7. Se for apresentada a mensagem Autorizar Cloud Shell, clique em Autorizar.

    8. Verifique se o conteúdo da mensagem corresponde à mensagem do Pub/Sub. Neste exemplo, a mensagem de saída tem de começar com Message b'target_dag' with message_length 10 published to como resposta da sua função.

  3. Verifique se o target_dag foi acionado:

    1. Aguarde, pelo menos, um minuto para que uma nova execução do DAG de trigger_dag seja concluída.

    2. Na Google Cloud consola, aceda à página Ambientes.

      Aceder a Ambientes

    3. Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.

    4. Aceda ao separador DAGs.

    5. Clique em trigger_dag para aceder à página Detalhes do DAG. No separador Execuções, é apresentada uma lista de execuções de DAG para o DAG trigger_dag.

      Este DAG é executado a cada minuto e processa todas as mensagens do Pub/Sub enviadas a partir da função. Se não forem enviadas mensagens, a tarefa trigger_target é marcada como Skipped nos registos de execução do DAG. Se os DAGs foram acionados, a tarefa trigger_target é marcada como Success.

    6. Examine várias execuções recentes do DAG para localizar uma execução do DAG em que todas as três tarefas (subscribe_task, pull_messages_operator e trigger_target) estão nos estados Success.

    7. Volte ao separador DAGs e verifique se a coluna Execuções bem-sucedidas do DAG target_dag apresenta uma execução bem-sucedida.

Resumo

Neste tutorial, aprendeu a usar as funções do Cloud Run para publicar mensagens num tópico do Pub/Sub e implementar um DAG que subscreve um tópico do Pub/Sub, extrai mensagens do Pub/Sub e aciona outro DAG especificado no ID do DAG dos dados da mensagem.

Também existem formas alternativas de criar e gerir subscrições do Pub/Sub e acionar DAGs que estão fora do âmbito deste tutorial. Por exemplo, pode usar funções do Cloud Run para acionar DAGs do Airflow quando ocorre um evento especificado. Consulte os nossos tutoriais para experimentar as outras Google Cloud funcionalidades.

Limpar

Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados neste tutorial, elimine o projeto que contém os recursos ou mantenha o projeto e elimine os recursos individuais.

Elimine o projeto

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Elimine recursos individuais

Se planeia explorar vários tutoriais e inícios rápidos, a reutilização de projetos pode ajudar a evitar exceder os limites de quota do projeto.

Consola

  1. Elimine o ambiente do Cloud Composer. Também elimina o contentor do ambiente durante este procedimento.
  2. Elimine o tópico do Pub/Sub, dag-topic-trigger.
  3. Elimine a função do Cloud Run.

    1. Na Google Cloud consola, aceda às funções do Cloud Run.

      Aceder às funções do Cloud Run

    2. Clique na caixa de verificação da função que quer eliminar, pubsub-publisher.

    3. Clique em Eliminar e, de seguida, siga as instruções.

Terraform

  1. Certifique-se de que o script do Terraform não contém entradas para recursos que o seu projeto ainda precisa. Por exemplo, pode querer manter algumas APIs ativadas e as autorizações do IAM ainda atribuídas (se adicionou essas definições ao seu script do Terraform).
  2. Corrida terraform destroy.
  3. Elimine manualmente o contentor do ambiente. O Cloud Composer não a elimina automaticamente. Pode fazê-lo a partir da Google Cloud consola ou da Google Cloud CLI.

O que se segue?