Attiva i DAG utilizzando Cloud Functions e i messaggi Pub/Sub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina illustra la creazione di un'architettura di push basata su eventi attivazione dei DAG di Cloud Composer in risposta a Pub/Sub modifiche all'argomento. Gli esempi in questo tutorial mostrano la gestione dell'intero ciclo della gestione di Pub/Sub, inclusa la gestione degli abbonamenti, nell'ambito del processo DAG. È adatto ad alcuni casi d'uso comuni quando devi attivare i DAG, ma non vuoi configurare autorizzazioni di accesso aggiuntive.

Ad esempio, i messaggi inviati tramite Pub/Sub possono essere utilizzati come soluzione se non vuoi fornire accesso diretto a un ambiente Cloud Composer per motivi di sicurezza. Puoi configurare funzione Cloud Run che crea messaggi Pub/Sub e le pubblica su un argomento Pub/Sub. Puoi quindi creare un DAG esegue il pull dei messaggi Pub/Sub e li gestisce.

In questo esempio specifico, crei una funzione Cloud Run ed eseguirai il deployment due DAG. Il primo DAG esegue il pull dei messaggi Pub/Sub e attiva il secondo DAG in base al contenuto del messaggio Pub/Sub.

Questo tutorial presuppone che tu abbia familiarità con Python e con la console Google Cloud.

Obiettivi

Costi

Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:

Dopo aver completato questo tutorial, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori dettagli, consulta Pulizia.

Prima di iniziare

Per questo tutorial, hai bisogno di un progetto Google Cloud. Configura il progetto nel seguente modo:

  1. Nella console Google Cloud, seleziona o crea un progetto:

    Vai al selettore di progetti

  2. Verifica che la fatturazione sia attivata per il tuo progetto. Scopri come verificare se la fatturazione è abilitata per un progetto.

  3. Assicurati che l'utente del progetto Google Cloud disponga dei seguenti ruoli per creare le risorse necessarie:

    • Utente dell'account di servizio (roles/iam.serviceAccountUser)
    • Editor Pub/Sub (roles/pubsub.editor)
    • Amministratore oggetti di ambiente e archiviazione (roles/composer.environmentAndStorageObjectAdmin)
    • Amministratore delle funzioni Cloud Run (roles/cloudfunctions.admin)
    • Visualizzatore log (roles/logging.viewer)
  4. Assicurati che il service account che esegue la funzione Cloud Run disponga delle autorizzazioni sufficienti nel progetto per accedere a Pub/Sub. Per impostazione predefinita, le funzioni Cloud Run utilizzano l'account di servizio predefinito di App Engine. Questo account di servizio ha il ruolo Editor, che dispone delle autorizzazioni sufficienti per questo tutorial.

Abilita le API per il tuo progetto

Console

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Abilita l'API Cloud Composer nel tuo progetto aggiungendo quanto segue le definizioni delle risorse per lo script Terraform:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Sostituisci <PROJECT_ID> con l'ID progetto del tuo progetto. Ad esempio: example-project.

Crea l'ambiente Cloud Composer

Crea un ambiente Cloud Composer 2.

Nell'ambito di questa procedura, devi concedere il ruolo Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) all'account agente di servizio Composer. Cloud Composer utilizza questo account per eseguire operazioni nel tuo progetto Google Cloud.

crea un argomento Pub/Sub

Questo esempio attiva un DAG in risposta a un messaggio inviato tramite push a un Pub/Sub. Crea un argomento Pub/Sub da utilizzare in questo esempio:

Console

  1. Nella console Google Cloud, vai alla pagina Argomenti Pub/Sub.

    Vai ad Argomenti Pub/Sub.

  2. Fai clic su Crea argomento.

  3. Nel campo ID argomento, inserisci dag-topic-trigger come ID per l'argomento.

  4. Lascia le altre opzioni impostate sui valori predefiniti.

  5. Fai clic su Crea argomento.

gcloud

Per creare un argomento, esegui il comando gcloud pubsub topics create in Google Cloud CLI:

gcloud pubsub topics create dag-topic-trigger

Terraform

Aggiungi le seguenti definizioni di risorse allo script Terraform:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Sostituisci <PROJECT_ID> con l'ID progetto del tuo progetto. Ad esempio: example-project.

Carica i DAG

Carica i DAG nel tuo ambiente:

  1. Salva il seguente file DAG sul tuo computer locale.
  2. Sostituisci <PROJECT_ID> con l'ID progetto del tuo progetto. Ad esempio: example-project.
  3. Carica il file DAG modificato nel tuo ambiente.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

Il codice campione contiene due DAG: trigger_dag e target_dag.

Il DAG trigger_dag si iscrive a un argomento Pub/Sub, estrae i messaggi Pub/Sub e attiva un altro DAG specificato nell'ID DAG dei dati del messaggio Pub/Sub. In questo esempio, trigger_dag trigger il DAG target_dag, che invia i messaggi ai log delle attività.

Il DAG trigger_dag contiene le seguenti attività:

  • subscribe_task: iscriviti a un argomento Pub/Sub.
  • pull_messages_operator: lettura dei dati di un messaggio Pub/Sub con PubSubPullOperator.
  • trigger_target_dag: attiva un altro DAG (in questo esempio, target_dag) in base ai dati dei messaggi estratti dall'argomento Pub/Sub.

Il DAG target_dag contiene una sola attività: output_to_logs. Questa attività stampa i messaggi nel log delle attività con un ritardo di un secondo.

.

Esegui il deployment di una funzione Cloud Run che pubblica messaggi in un argomento Pub/Sub

In questa sezione eseguirai il deployment di una funzione Cloud Run che pubblica messaggi in un argomento Pub/Sub.

Crea una funzione Cloud Run e specifica la relativa configurazione

Console

  1. Nella console Google Cloud, vai alla pagina Funzioni Cloud Run.

    Vai alle funzioni Cloud Run

  2. Fai clic su Crea funzione.

  3. Nel campo Ambiente, seleziona 1ª gen..

  4. Nel campo Nome funzione, inserisci il nome della funzione: pubsub-publisher.

  5. Nel campo Tipo di trigger, seleziona HTTP.

  6. Nella sezione Autenticazione, seleziona Consenti chiamate non autenticate. Questa opzione concede utenti non autenticati di richiamare una funzione HTTP.

  7. Fai clic su Salva.

  8. Fai clic su Avanti per andare al passaggio Codice.

Terraform

Valuta la possibilità di utilizzare la console Google Cloud per questo passaggio, in quanto non esiste un modo semplice per gestire il codice sorgente della funzione da Terraform.

Questo esempio mostra come caricare una funzione Cloud Run da un file di archivio ZIP locale creando un bucket Cloud Storage, per archiviare il file in questo bucket, utilizzandolo nel bucket come per la funzione Cloud Run. Se utilizzi questo approccio, Terraform non aggiorna automaticamente il codice sorgente della funzione, anche se crei un nuovo file di archivio. Per ricaricare il codice della funzione, puoi modificare il nome del file dell'archivio.

  1. Scarica i file pubsub_publisher.py e requirements.txt.
  2. Nel file pubsub_publisher.py, sostituisci <PROJECT_ID> con il ID progetto del tuo progetto. Per ad esempio example-project.
  3. Crea un archivio ZIP denominato pubsub_function.zip con pbusub_publisner.py e il file requirements.txt.
  4. Salva l'archivio ZIP in una directory in cui è archiviato lo script Terraform.
  5. Aggiungi le seguenti definizioni di risorse allo script Terraform e sostituisci <PROJECT_ID> con l'ID del progetto.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Specifica i parametri di codice della funzione Cloud Run

Console

  1. Nel passaggio Codice, seleziona la lingua nel campo Runtime il runtime utilizzato dalla funzione. In questo esempio, seleziona Python 3.10.

  2. Nel campo Punto di contatto, inserisci pubsub_publisher. Questo è il codice che viene eseguito durante l'esecuzione della funzione Cloud Run. Il valore di questo flag deve essere un nome di funzione o un nome di classe completo esistente nel codice sorgente.

Terraform

Ignora questo passaggio. I parametri della funzione Cloud Run sono già definiti in la risorsa google_cloudfunctions_function.

Carica il codice della funzione Cloud Run

Console

Nel campo Codice sorgente, seleziona l'opzione appropriata per fornisci il codice sorgente della funzione. In questo tutorial, aggiungi il codice della tua funzione utilizzando le funzioni di Cloud Run Editor incorporato. In alternativa, puoi caricare un file ZIP o utilizzare Cloud Source Repositories.

  1. Inserisci il seguente esempio di codice nel file main.py.
  2. Sostituisci <PROJECT_ID> con l'ID progetto del tuo progetto. Ad esempio: example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Ignora questo passaggio. I parametri della funzione Cloud Run sono già definiti in la risorsa google_cloudfunctions_function.

Specifica le dipendenze delle funzioni Cloud Run

Console

Specifica le dipendenze delle funzioni nel file di metadati requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

Quando esegui il deployment della funzione, le funzioni Cloud Run scaricano e installano le dipendenze dichiarate nel file requirements.txt, una riga per pacchetto. Questo file deve trovarsi nella stessa directory del file main.py che contiene il codice della funzione. Per ulteriori dettagli, consulta la sezione File dei requisiti nella documentazione di pip.

Terraform

Ignora questo passaggio. Le dipendenze delle funzioni Cloud Run sono definite nel file requirements.txt nell'archivio pubsub_function.zip.

Esegui il deployment della funzione Cloud Run

Console

Fai clic su Esegui il deployment. Al termine del deployment, la funzione viene visualizzata con un segno di spunta verde nella pagina Funzioni Cloud Run della console Google Cloud.

Assicurati che l'account di servizio che esegue la funzione Cloud Run disponi di autorizzazioni sufficienti nel tuo progetto per accedere in Pub/Sub.

Terraform

  1. Inizializza Terraform:

    terraform init
    
  2. Rivedi la configurazione e verifica che le risorse di Terraform che crei o aggiorni in base alle tue aspettative:

    terraform plan
    
  3. Per verificare se la configurazione è valida, esegui il seguente comando:

    terraform validate
    
  4. Applica la configurazione Terraform eseguendo il comando seguente inserendo sì nel prompt:

    terraform apply
    

Attendi che Terraform mostri il messaggio "Applicazione completata".

Nella console Google Cloud, vai alle risorse nell'interfaccia utente per verificare che Terraform le abbia create o aggiornate.

Testa la funzione Cloud Run

per verificare che la funzione pubblichi un messaggio in un argomento Pub/Sub. e che i DAG di esempio funzionino come previsto:

  1. Controlla che i DAG siano attivi:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Si apre la pagina Dettagli ambiente.

    3. Vai alla scheda DAG.

    4. Verifica i valori nella colonna Stato per i DAG denominati trigger_dag e target_dag. Entrambi i DAG devono essere nello stato Active.

  2. Invia un messaggio Pub/Sub di prova. Puoi farlo in Cloud Shell:

    1. Nella console Google Cloud, vai alla pagina Funzioni.

      Vai alle funzioni Cloud Run

    2. Fai clic sul nome della funzione, pubsub-publisher.

    3. Vai alla scheda Test.

    4. Nella sezione Configura evento di attivazione, inserisci la seguente coppia chiave-valore JSON: {"message": "target_dag"}. Non modificare la coppia chiave-valore. perché questo messaggio attiva il DAG di test in un secondo momento.

    5. Nella sezione Comando di test, fai clic su Testa in Cloud Shell.

    6. Nel terminale Cloud Shell, attendi che venga visualizzato un comando automaticamente. Esegui questo comando premendo Enter.

    7. Se viene visualizzato il messaggio Autorizza Cloud Shell, fai clic su Autorizza.

    8. Verifica che i contenuti del messaggio corrispondano a Pub/Sub per creare un nuovo messaggio email. In questo esempio, il messaggio di output deve iniziare con Message b'target_dag' with message_length 10 published to come dalla tua funzione.

  3. Verifica che target_dag sia stato attivato:

    1. Attendi almeno un minuto per il completamento di una nuova esecuzione DAG di trigger_dag.

    2. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai ad Ambienti

    3. Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Viene visualizzata la pagina Dettagli dell'ambiente.

    4. Vai alla scheda DAG.

    5. Fai clic su trigger_dag per andare alla pagina Dettagli DAG. Nella scheda Corsa , viene visualizzato un elenco delle esecuzioni di DAG per il DAG trigger_dag.

      Questo DAG viene eseguito ogni minuto ed elabora tutti i Pub/Sub di messaggi inviati dalla funzione. Se non è stato inviato nessun messaggio, L'attività trigger_target è contrassegnata come Skipped nei log di esecuzione dei DAG. Se sono stati attivati i DAG, l'attività trigger_target viene contrassegnata come Success.

    6. Esamina diverse esecuzioni di DAG recenti per individuare un'esecuzione di DAG in cui tutte e tre le attività (subscribe_task, pull_messages_operator e trigger_target) sono in stato Success.

    7. Torna alla scheda DAG e controlla che la colonna Esecuzioni riuscite per il DAG target_dag mostri un'esecuzione riuscita.

Riepilogo

In questo tutorial hai imparato a utilizzare le funzioni di Cloud Run per pubblicare in un argomento Pub/Sub ed eseguire il deployment di un DAG che sottoscrive una argomento Pub/Sub, esegue il pull dei messaggi Pub/Sub e dei trigger a un altro DAG specificato nell'ID DAG dei dati dei messaggi.

Esistono anche metodi alternativi per creare e gestire le sottoscrizioni Pub/Sub e attivare i DAG che non rientrano nell'ambito di questo tutorial. Ad esempio, puoi utilizzare le funzioni Cloud Run per attivare i DAG Airflow quando si verifica un evento specifico. Dai un'occhiata ai nostri tutorial per provare gli altri le funzionalità di Google Cloud.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse usati in questo tutorial, elimina il progetto che contiene le risorse o mantenere il progetto ed eliminare le singole risorse.

Elimina il progetto

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Elimina singole risorse

Se intendi esplorare più tutorial e guide rapide, il riuso dei progetti ti aiuta a non superare i limiti di quota.

Console

  1. Eliminare l'ambiente Cloud Composer. Inoltre, durante questa procedura viene eliminato il bucket dell'ambiente.
  2. Elimina l'argomento Pub/Sub, dag-topic-trigger.
  3. Elimina la funzione Cloud Run.

    1. Nella console Google Cloud, vai a Funzioni Cloud Run.

      Vai alle funzioni Cloud Run

    2. Fai clic sulla casella di controllo della funzione da eliminare,pubsub-publisher.

    3. Fai clic su Elimina e segui le istruzioni.

Terraform

  1. Assicurati che il tuo script Terraform non contenga voci per le risorse ancora richieste dal tuo progetto. Ad esempio, potrebbe voler mantenere alcune API abilitate e IAM ancora assegnate (se hai aggiunto queste definizioni script Terraform).
  2. Esegui terraform destroy.
  3. Elimina manualmente il bucket dell'ambiente. Cloud Composer non lo elimina automaticamente. Puoi farlo da nella console Google Cloud o in Google Cloud CLI.

Passaggi successivi