Acione DAGs usando o Cloud Functions e as mensagens do Pub/Sub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Esta página o orienta na criação de uma arquitetura de envio baseada em eventos por como acionar DAGs do Cloud Composer em resposta ao Pub/Sub mudanças de tópico. Os exemplos neste tutorial demonstram como lidar com o ciclo completo do gerenciamento do Pub/Sub, incluindo o gerenciamento de assinaturas, do processo do DAG. Ele é adequado para alguns dos casos de uso comuns quando você precisa acionar DAGs, mas não quer configurar permissões de acesso extras.

Por exemplo, as mensagens enviadas pelo Pub/Sub podem ser usadas se você não quiser dar acesso direto a um bucket do Cloud Composer, por motivos de segurança. É possível configurar um do função do Cloud que cria mensagens do Pub/Sub e e as publica em um tópico do Pub/Sub. Em seguida, é possível criar um DAG extrai mensagens do Pub/Sub e as processa.

Neste exemplo específico, você cria uma função do Cloud e implanta dois DAGs. O primeiro DAG extrai mensagens do Pub/Sub e aciona de acordo com o conteúdo da mensagem do Pub/Sub.

Para acompanhar este tutorial, é necessário ter familiaridade com o Python e o console do Google Cloud.

Objetivos

Custos

Neste tutorial, usamos o seguinte componente faturável do Google Cloud:

Depois de concluir este tutorial, para evitar o faturamento contínuo, exclua nos recursos criados. Consulte Limpeza para mais detalhes.

Antes de começar

Para este tutorial, você precisa ter uma conta do Google Cloud projeto. Configure o projeto da seguinte maneira:

  1. No console do Google Cloud, selecione ou crie um projeto:

    Acesse o seletor de projetos

  2. Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.

  3. Verifique se o usuário do projeto do Google Cloud tem os seguintes papéis para criar os recursos necessários:

    • Usuário da conta de serviço (roles/iam.serviceAccountUser)
    • Editor do Pub/Sub (roles/pubsub.editor)
    • Administrador de ambiente e de objetos do Storage (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador do Cloud Functions (roles/cloudfunctions.admin)
    • Visualizador de registros (roles/logging.viewer)
  4. Certifique-se de que o conta de serviço que executa sua função do Cloud tem permissões suficientes no projeto para acessar o Pub/Sub. De padrão, o Cloud Functions usa o Conta de serviço padrão do App Engine Essa conta de serviço tem o papel Editor, que tem permissões para este tutorial.

Ativar as APIs do projeto

Console

Ative as APIs Cloud Composer, Cloud Functions, and Pub/Sub.

Ative as APIs

gcloud

Ative as APIs Cloud Composer, Cloud Functions, and Pub/Sub:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Ative a API Cloud Composer no seu projeto adicionando o seguinte definições de recursos ao script do Terraform:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Substitua <PROJECT_ID> pelo ID do projeto. do seu projeto. Por exemplo, example-project.

criar o ambiente do Cloud Composer

Crie um ambiente do Cloud Composer 2.

Como parte desse procedimento, você concede a extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) para o agente de serviço do Composer do Compute Engine. O Cloud Composer usa essa conta para realizar operações no projeto do Google Cloud.

Criar um tópico do Pub/Sub

Este exemplo aciona um DAG em resposta a uma mensagem enviada a um tópico do Pub/Sub. Crie um tópico do Pub/Sub para usar neste exemplo:

Console

  1. No console do Google Cloud, acesse a página Tópicos do Pub/Sub.

    Acessar tópicos do Pub/Sub

  2. Clique em Criar tópico.

  3. No campo ID do tópico, insira dag-topic-trigger como o ID da sua tópico.

  4. Não altere as outras opções.

  5. Clique em Criar tópico.

gcloud

Para criar um tópico, execute o gcloud pubsub topics create na Google Cloud CLI:

gcloud pubsub topics create dag-topic-trigger

Terraform

Adicione as seguintes definições de recursos ao script do Terraform:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Substitua <PROJECT_ID> pelo ID do projeto. do seu projeto. Por exemplo, example-project.

Fazer upload dos DAGs

Faça upload dos DAGs para seu ambiente:

  1. Salve o seguinte arquivo DAG no seu computador local.
  2. Substitua <PROJECT_ID> pelo ID do projeto. do seu projeto. Por exemplo, example-project.
  3. Faça upload do arquivo DAG editado para seu ambiente.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

O exemplo de código contém dois DAGs: trigger_dag e target_dag.

O DAG trigger_dag se inscreve em um tópico do Pub/Sub, extrai mensagens do Pub/Sub e aciona outro DAG especificado no ID dele dos dados de mensagens do Pub/Sub. Neste exemplo, trigger_dag aciona o DAG target_dag, que envia mensagens para os registros de tarefas.

O DAG trigger_dag contém as seguintes tarefas:

  • subscribe_task: assina um tópico do Pub/Sub.
  • pull_messages_operator: lê os dados de uma mensagem do Pub/Sub com PubSubPullOperator.
  • trigger_target_dag: acionar outro DAG (neste exemplo, target_dag) de acordo com os dados nas mensagens extraídas do Pub/Sub tópico.

O DAG target_dag contém apenas uma tarefa: output_to_logs. Esta tarefa imprime mensagens no registro de tarefas com um segundo de atraso.

Implantar uma função do Cloud que publique mensagens em um tópico do Pub/Sub

Nesta seção, você implantará uma função do Cloud que publica mensagens em um tópico do Pub/Sub.

Criar uma função do Cloud e especificar a configuração dela

Console

  1. No Console do Google Cloud, acesse a página Cloud Functions.

    Acesse o Cloud Functions

  2. Clique em Criar função.

  3. No campo Ambiente selecione 1a geração.

  4. No campo Nome da função, insira o nome da sua função: pubsub-publisher

  5. No campo Tipo de gatilho, selecione HTTP.

  6. Na seção Autenticação, selecione Permita invocações não autenticadas. Essa opção concede usuários não autenticados invocar uma função HTTP.

  7. Clique em "Salvar".

  8. Clique em Próximo para acessar a etapa Código.

Terraform

Considere usar o console do Google Cloud nesta etapa, porque não há maneira simples de gerenciar o código-fonte da função no Terraform.

Este exemplo demonstra como fazer upload de uma função do Cloud a partir de um arquivo zip local criando um bucket do Cloud Storage armazenar o arquivo neste bucket e, em seguida, usar o arquivo do bucket como um fonte para a função do Cloud. Se você usar essa abordagem, O Terraform não atualiza automaticamente o código-fonte da sua função, mesmo que você crie um novo arquivo morto. Para fazer um novo upload do código da função, pode alterar o nome do arquivo.

  1. Faça o download do pubsub_publisher.py e o requirements.txt .
  2. No arquivo pubsub_publisher.py, substitua <PROJECT_ID> pelo O ID do seu projeto. Por exemplo, example-project.
  3. Crie um arquivo zip chamado pubsub_function.zip com o pbusub_publisner.py e o arquivo requirements.txt.
  4. Salve o arquivo ZIP no diretório em que o script do Terraform está armazenado.
  5. Adicione as definições de recursos a seguir ao script do Terraform e Substitua <PROJECT_ID> pelo ID do seu projeto.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Especificar parâmetros do código da função do Cloud

Console

  1. Na etapa Código, no campo Ambiente de execução, selecione o idioma o ambiente de execução usado pela função. Neste exemplo, selecione Python 3.10.

  2. No campo Ponto de entrada, insira pubsub_publisher. Este é o código que é executado quando a função do Cloud é executada. O valor de essa sinalização deve ser um nome de função ou um nome de classe totalmente qualificado que no código-fonte.

Terraform

Pule esta etapa. Os parâmetros da função do Cloud já estão definidos o recurso google_cloudfunctions_function.

Fazer upload do código da função do Cloud

Console

No campo Código-fonte, selecione a opção apropriada para como você fornecer o código-fonte da função. Neste tutorial, adicione o código da função usando o Cloud Functions Editor in-line. Como alternativa, você pode fazer upload de um arquivo ZIP ou usar Cloud Source Repositories.

  1. Coloque o exemplo de código a seguir no arquivo main.py.
  2. Substitua <PROJECT_ID> pelo ID do projeto. do seu projeto. Por exemplo, example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Pule esta etapa. Os parâmetros da função do Cloud já estão definidos o recurso google_cloudfunctions_function.

Especificar as dependências da função do Cloud

Console

Especifique as dependências da função no arquivo de metadados requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

Quando você implanta a função, o Cloud Functions faz o download e instala dependências declaradas no arquivo requirements.txt, uma linha por pacote. Esse arquivo deve estar no mesmo diretório que o arquivo main.py que contém o código da função. Para mais detalhes, consulte Arquivos de requisitos na documentação de pip.

Terraform

Pule esta etapa. As dependências da função do Cloud são definidas em requirements.txt no arquivo pubsub_function.zip.

Implantar a função do Cloud

Console

Clique em Implantar. Quando a implantação é concluída, a função é exibida com uma marca de seleção verde na página Cloud Functions na console do Google Cloud.

Confirme se a conta de serviço que executa a função do Cloud tem permissões suficientes no projeto para acessar Pub/Sub

Terraform

  1. Inicialize o Terraform:

    terraform init
    
  2. Revise a configuração e verifique se os recursos que o Terraform está criar ou atualizar correspondem às suas expectativas:

    terraform plan
    
  3. Para verificar se a configuração é válida, execute o seguinte comando:

    terraform validate
    
  4. Aplique a configuração do Terraform executando o seguinte comando e inserindo yes no prompt:

    terraform apply
    

Aguarde até que o Terraform exiba a mensagem "Apply complete!".

No console do Google Cloud, navegue até os recursos na UI para fazer que o Terraform tenha criado ou atualizado essas informações.

Testar a Função do Cloud

Para verificar se a função publica uma mensagem em um tópico do Pub/Sub e se os DAGs de exemplo funcionam conforme o esperado:

  1. Verifique se os DAGs estão ativos:

    1. No console do Google Cloud, acesse a página Ambientes.

      Acessar "Ambientes"

    2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

    3. Acesse a guia DAGs.

    4. Verifique os valores na coluna Estado para DAGs com os nomes trigger_dag e target_dag. Os dois DAGs precisam estar no estado Active.

  2. Envie uma mensagem de teste do Pub/Sub. Você pode fazer isso em O Cloud Shell:

    1. No console do Google Cloud, acesse a página Funções.

      Acesse o Cloud Functions

    2. Clique no nome da função, pubsub-publisher.

    3. Acesse a guia Testes.

    4. Na seção Configurar evento de acionamento, insira o seguinte: Chave-valor JSON: {"message": "target_dag"}. Não modifique a chave-valor porque essa mensagem aciona o DAG de teste mais tarde.

    5. Na seção Testar comando, clique em Testar no Cloud Shell.

    6. No Terminal do Cloud Shell, aguarde até que um comando seja exibido. automaticamente. Execute este comando pressionando Enter.

    7. Se a mensagem Autorizar o Cloud Shell for exibida, Clique em Autorizar.

    8. Verifique se o conteúdo da mensagem corresponde ao código do Pub/Sub mensagem. Neste exemplo, a mensagem de saída precisa começar com Message b'target_dag' with message_length 10 published to como resposta da sua função.

  3. Verifique se target_dag foi acionado:

    1. Aguarde pelo menos um minuto para que uma nova execução do DAG de trigger_dag seja concluída é concluída.

    2. No console do Google Cloud, acesse a página Ambientes.

      Acessar "Ambientes"

    3. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

    4. Acesse a guia DAGs.

    5. Clique em trigger_dag para acessar a página Detalhes do DAG. Na guia Execuções uma lista de execuções do DAG para o DAG trigger_dag será exibida.

      Esse DAG é executado a cada minuto e processa todos os dados do Pub/Sub mensagens enviadas pela função. Se nenhuma mensagem foi enviada, o A tarefa trigger_target está marcada como Skipped nos registros de execução do DAG. Se os DAGs foram acionados, e a tarefa trigger_target foi marcada como Success.

    6. Analise várias execuções recentes de DAGs para localizar uma execução em que todos três tarefas (subscribe_task, pull_messages_operator e trigger_target) estão em status Success.

    7. Volte para a guia DAGs e verifique se as Execuções bem-sucedidas do DAG target_dag lista uma execução bem-sucedida.

Resumo

Neste tutorial, você aprendeu a usar o Cloud Functions para publicar mensagens em um tópico do Pub/Sub e implantar um DAG que assina um Pub/Sub, extrai mensagens e gatilhos do Pub/Sub outro DAG especificado no ID do DAG dos dados da mensagem.

Há também formas alternativas de Como criar e gerenciar assinaturas do Pub/Sub e acionar DAGs que estão fora do escopo deste tutorial. Por exemplo, é possível usar o Cloud Functions para acionar DAGs do Airflow quando ocorre um evento especificado. Confira nossos tutoriais para testar as outras os recursos do Google Cloud para você.

Limpar

Para evitar cobranças dos recursos na sua conta do Google Cloud usados neste tutorial, exclua o projeto que contém os recursos ou manter o projeto e excluir os recursos individuais.

Exclua o projeto

    Exclua um projeto do Google Cloud:

    gcloud projects delete PROJECT_ID

Excluir recursos individuais

Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.

Console

  1. Exclua o ambiente do Cloud Composer. Você também excluir o bucket do ambiente durante este procedimento.
  2. Exclua o tópico do Pub/Sub, dag-topic-trigger.
  3. Exclua a função do Cloud.

    1. No Console do Google Cloud, acesse o Cloud Functions.

      Acesse o Cloud Functions

    2. Clique na caixa de seleção da função que você quer excluir. pubsub-publisher:

    3. Clique em Excluir e siga as instruções.

Terraform

  1. Confirme que o script do Terraform não contém entradas para recursos que ainda são exigidos pelo seu projeto. Por exemplo, manter algumas APIs ativadas e o IAM permissões ainda atribuídas (se você adicionou essas definições ao seu script do Terraform).
  2. Execute terraform destroy.
  3. Exclua manualmente o bucket do ambiente. Cloud Composer não o exclui automaticamente. Você pode fazer isso console ou Google Cloud CLI.

A seguir