Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina ti guida nella creazione di un'architettura push basata sugli eventi attivando i DAG di Cloud Composer in risposta alle modifiche degli argomenti Pub/Sub. Gli esempi di questo tutorial mostrano la gestione dell'intero ciclo di Pub/Sub, inclusa la gestione delle sottoscrizioni, come parte del processo DAG. È adatto ad alcuni casi d'uso comuni quando devi attivare i DAG, ma non vuoi configurare autorizzazioni di accesso aggiuntive.
Ad esempio, i messaggi inviati tramite Pub/Sub possono essere utilizzati come soluzione se non vuoi fornire l'accesso diretto a un ambiente Cloud Composer per motivi di sicurezza. Puoi configurare una funzione Cloud Run che crea messaggi Pub/Sub e li pubblica in un argomento Pub/Sub. Puoi quindi creare un DAG che recupera i messaggi Pub/Sub e li gestisce.
In questo esempio specifico, crei una funzione Cloud Run e implementi due DAG. Il primo DAG esegue il pull dei messaggi Pub/Sub e attiva il secondo DAG in base al contenuto del messaggio Pub/Sub.
Questo tutorial presuppone che tu abbia familiarità con Python e la console Google Cloud .
Obiettivi
Costi
Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:
- Cloud Composer (vedi anche costi aggiuntivi)
- Pub/Sub
- Cloud Run Functions
Al termine di questo tutorial, puoi evitare l'addebito di ulteriori costi eliminando le risorse create. Per maggiori dettagli, vedi Esegui la pulizia.
Prima di iniziare
Per questo tutorial, è necessario un Google Cloud progetto. Configura il progetto nel seguente modo:
Nella console Google Cloud , seleziona o crea un progetto:
Verifica che la fatturazione sia attivata per il tuo progetto. Scopri come verificare se la fatturazione è abilitata per un progetto.
Assicurati che l'utente del progetto Google Cloud disponga dei seguenti ruoli per creare le risorse necessarie:
- Utente service account (
roles/iam.serviceAccountUser
) - Editor Pub/Sub (
roles/pubsub.editor
) - Amministratore ambienti e oggetti Storage
(
roles/composer.environmentAndStorageObjectAdmin
) - Amministratore di Cloud Run Functions (
roles/cloudfunctions.admin
) - Visualizzatore log (
roles/logging.viewer
)
- Utente service account (
Assicurati che l'account di servizio che esegue la funzione Cloud Run disponga di autorizzazioni sufficienti nel tuo progetto per accedere a Pub/Sub. Per impostazione predefinita, le funzioni Cloud Run utilizzano l'account di servizio predefinito App Engine. Questo account di servizio ha il ruolo Editor, che dispone di autorizzazioni sufficienti per questo tutorial.
Abilitare le API per il progetto
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Abilita l'API Cloud Composer nel tuo progetto aggiungendo le seguenti definizioni di risorse allo script Terraform:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Sostituisci <PROJECT_ID>
con l'ID progetto del tuo progetto. Ad esempio: example-project
.
Crea l'ambiente Cloud Composer
Crea un ambiente Cloud Composer 2.
Nell'ambito di questa procedura,
concedi il ruolo Cloud Composer v2 API Service Agent Extension
(roles/composer.ServiceAgentV2Ext
) all'account dell'agente di servizio Composer. Cloud Composer utilizza questo account per eseguire operazioni
nel tuo progetto Google Cloud .
crea un argomento Pub/Sub
Questo esempio attiva un DAG in risposta a un messaggio inviato a un argomento Pub/Sub. Crea un argomento Pub/Sub da utilizzare in questo esempio:
Console
Nella console Google Cloud , vai alla pagina Argomenti Pub/Sub.
Fai clic su Crea argomento.
Nel campo ID argomento, inserisci
dag-topic-trigger
come ID per l'argomento.Lascia invariate le altre opzioni predefinite.
Fai clic su Crea argomento.
gcloud
Per creare un argomento, esegui il comando gcloud pubsub topics create in Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Aggiungi le seguenti definizioni di risorse allo script Terraform:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Sostituisci <PROJECT_ID>
con l'ID progetto del tuo progetto. Ad esempio: example-project
.
Caricare i DAG
Carica i DAG nel tuo ambiente:
- Salva il seguente file DAG sul computer locale.
- Sostituisci
<PROJECT_ID>
con l'ID progetto del tuo progetto. Ad esempio:example-project
. - Carica il file DAG modificato nel tuo ambiente.
Il codice campione contiene due DAG: trigger_dag
e target_dag
.
Il DAG trigger_dag
si iscrive a un argomento Pub/Sub, recupera i messaggi Pub/Sub e attiva un altro DAG specificato nell'ID DAG dei dati dei messaggi Pub/Sub. In questo esempio, trigger_dag
attiva
il DAG target_dag
, che restituisce messaggi ai log delle attività.
Il DAG trigger_dag
contiene le seguenti attività:
subscribe_task
: Abbonati a un argomento Pub/Sub.pull_messages_operator
: leggi i dati di un messaggio Pub/Sub conPubSubPullOperator
.trigger_target_dag
: attiva un altro DAG (in questo esempio,target_dag
) in base ai dati nei messaggi estratti dall'argomento Pub/Sub.
Il DAG target_dag
contiene una sola attività: output_to_logs
. Questa attività
stampa i messaggi nel log delle attività con un ritardo di un secondo.
Esegui il deployment di una funzione Cloud Run che pubblica messaggi in un argomento Pub/Sub
In questa sezione, esegui il deployment di una funzione Cloud Run che pubblica messaggi in un argomento Pub/Sub.
Crea una funzione Cloud Run e specifica la sua configurazione
Console
Nella Google Cloud console, vai alla pagina Cloud Run Functions.
Fai clic su Crea funzione.
Nel campo Ambiente, seleziona 1ª gen..
Nel campo Nome funzione, inserisci il nome della funzione:
pubsub-publisher
.Nel campo Tipo di trigger, seleziona HTTP.
Nella sezione Autenticazione, seleziona Consenti chiamate non autenticate. Questa opzione concede agli utenti non autenticati la possibilità di richiamare una funzione HTTP.
Fai clic su Salva.
Fai clic su Avanti per passare al passaggio Codice.
Terraform
Ti consigliamo di utilizzare la console Google Cloud per questo passaggio, perché non esiste un modo semplice per gestire il codice sorgente della funzione da Terraform.
Questo esempio mostra come caricare una funzione Cloud Run da un file di archivio zip locale creando un bucket Cloud Storage, memorizzando il file in questo bucket e poi utilizzando il file dal bucket come origine per la funzione Cloud Run. Se utilizzi questo approccio, Terraform non aggiorna automaticamente il codice sorgente della funzione, anche se crei un nuovo file di archivio. Per ricaricare il codice della funzione, puoi modificare il nome del file dell'archivio.
- Scarica i file
pubsub_publisher.py
erequirements.txt
. - Nel file
pubsub_publisher.py
, sostituisci<PROJECT_ID>
con l'ID progetto del tuo progetto. Ad esempio,example-project
. - Crea un archivio zip denominato
pubsub_function.zip
con il filepbusub_publisner.py
e il filerequirements.txt
. - Salva l'archivio ZIP in una directory in cui è memorizzato lo script Terraform.
- Aggiungi le seguenti definizioni di risorse allo script Terraform e
sostituisci
<PROJECT_ID>
con l'ID progetto del tuo progetto.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Specifica i parametri del codice della funzione Cloud Run
Console
Nel passaggio Codice, nel campo Runtime, seleziona il runtime del linguaggio utilizzato dalla funzione. In questo esempio, seleziona Python 3.10.
Nel campo Entry point (Punto di ingresso), inserisci
pubsub_publisher
. Questo è il codice che viene eseguito quando viene eseguita la funzione Cloud Run. Il valore di questo flag deve essere un nome di funzione o un nome di classe completo che esiste nel codice sorgente.
Terraform
Ignora questo passaggio. I parametri della funzione Cloud Run sono già definiti nella risorsa google_cloudfunctions_function
.
Carica il codice della funzione Cloud Run
Console
Nel campo Codice sorgente, seleziona l'opzione appropriata per la modalità di fornitura del codice sorgente della funzione. In questo tutorial, aggiungi il codice della funzione utilizzando l'editor incorporato di Cloud Run Functions. In alternativa, puoi caricare un file ZIP o utilizzare Cloud Source Repositories.
- Inserisci il seguente esempio di codice nel file main.py.
- Sostituisci
<PROJECT_ID>
con l'ID progetto del tuo progetto. Ad esempio:example-project
.
Terraform
Ignora questo passaggio. I parametri della funzione Cloud Run sono già definiti nella risorsa google_cloudfunctions_function
.
Specifica le dipendenze della funzione Cloud Run
Console
Specifica le dipendenze della funzione nel file di metadati requirements.txt:
Quando esegui il deployment della funzione, Cloud Run Functions scarica e installa
le dipendenze dichiarate nel file requirements.txt, una riga per pacchetto.
Questo file deve trovarsi nella stessa directory del file main.py che contiene
il codice della funzione. Per maggiori dettagli, consulta la sezione
File dei requisiti
nella documentazione di pip
.
Terraform
Ignora questo passaggio. Le dipendenze della funzione Cloud Run sono definite nel file requirements.txt
nell'archivio pubsub_function.zip
.
Esegui il deployment della funzione Cloud Run
Console
Fai clic su Esegui il deployment. Al termine del deployment, la funzione viene visualizzata con un segno di spunta verde nella pagina Funzioni Cloud Run della consoleGoogle Cloud .
Assicurati che il account di servizio che esegue la funzione Cloud Run disponga di autorizzazioni sufficienti nel tuo progetto per accedere a Pub/Sub.
Terraform
Inizializza Terraform:
terraform init
Rivedi la configurazione e verifica che le risorse che Terraform creerà o aggiornerà corrispondano alle tue aspettative:
terraform plan
Per verificare se la configurazione è valida, esegui questo comando:
terraform validate
Applica la configurazione Terraform eseguendo il seguente comando e inserendo yes al prompt:
terraform apply
Attendi che Terraform visualizzi il messaggio "Apply complete!" (Applicazione completata).
Nella console Google Cloud , vai alle risorse nell'interfaccia utente per assicurarti che Terraform le abbia create o aggiornate.
Testa la funzione Cloud Run
Per verificare che la funzione pubblichi un messaggio in un argomento Pub/Sub e che i DAG di esempio funzionino come previsto:
Verifica che i DAG siano attivi:
Nella console Google Cloud , vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.
Vai alla scheda DAG.
Controlla i valori nella colonna State (Stato) per i DAG denominati
trigger_dag
etarget_dag
. Entrambi i DAG devono essere nello statoActive
.
Esegui il push di un messaggio Pub/Sub di prova. Puoi farlo in Cloud Shell:
Nella console Google Cloud , vai alla pagina Funzioni.
Fai clic sul nome della funzione,
pubsub-publisher
.Vai alla scheda Test.
Nella sezione Configura evento di attivazione, inserisci la seguente coppia chiave-valore JSON:
{"message": "target_dag"}
. Non modificare la coppia chiave-valore perché questo messaggio attiva il DAG di test in un secondo momento.Nella sezione Test Command (Comando di test), fai clic su Test in Cloud Shell (Testa in Cloud Shell).
Nel terminale Cloud Shell, attendi che venga visualizzato automaticamente un comando. Esegui questo comando premendo
Enter
.Se viene visualizzato il messaggio Autorizza Cloud Shell, fai clic su Autorizza.
Verifica che i contenuti del messaggio corrispondano al messaggio Pub/Sub. In questo esempio, il messaggio di output deve iniziare con
Message b'target_dag' with message_length 10 published to
come risposta della funzione.
Verifica che
target_dag
sia stato attivato:Attendi almeno un minuto affinché venga completata una nuova esecuzione DAG di
trigger_dag
.Nella console Google Cloud , vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome del tuo ambiente. Viene visualizzata la pagina Dettagli ambiente.
Vai alla scheda DAG.
Fai clic su
trigger_dag
per andare alla pagina Dettagli DAG. Nella scheda Esecuzioni viene visualizzato un elenco di esecuzioni DAG per il DAGtrigger_dag
.Questo DAG viene eseguito ogni minuto ed elabora tutti i messaggi Pub/Sub inviati dalla funzione. Se non sono stati inviati messaggi, l'attività
trigger_target
viene contrassegnata comeSkipped
nei log di esecuzione del DAG. Se sono stati attivati DAG, l'attivitàtrigger_target
è contrassegnata comeSuccess
.Esamina diverse esecuzioni di DAG recenti per individuare un'esecuzione di DAG in cui tutte e tre le attività (
subscribe_task
,pull_messages_operator
etrigger_target
) si trovano negli statiSuccess
.Torna alla scheda DAG e verifica che la colonna Esecuzioni riuscite per il DAG
target_dag
elenchi un'esecuzione riuscita.
Riepilogo
In questo tutorial hai imparato a utilizzare le funzioni Cloud Run per pubblicare messaggi in un argomento Pub/Sub e a eseguire il deployment di un DAG che si iscrive a un argomento Pub/Sub, recupera i messaggi Pub/Sub e attiva un altro DAG specificato nell'ID DAG dei dati del messaggio.
Esistono anche modi alternativi per creare e gestire le sottoscrizioni Pub/Sub e attivare i DAG che non rientrano nell'ambito di questo tutorial. Ad esempio, puoi utilizzare le funzioni Cloud Run per attivare i DAG Airflow quando si verifica un evento specificato. Dai un'occhiata ai nostri tutorial per provare le altre funzionalità diGoogle Cloud .
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.
Elimina il progetto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Elimina singole risorse
Se intendi esplorare più tutorial e guide rapide, il riuso dei progetti ti aiuta a non superare i limiti di quota.
Console
- Elimina l'ambiente Cloud Composer. Durante questa procedura, elimini anche il bucket dell'ambiente.
- Elimina l'argomento Pub/Sub,
dag-topic-trigger
. Elimina la funzione Cloud Run.
Nella console Google Cloud , vai a Cloud Run Functions.
Fai clic sulla casella di controllo della funzione da eliminare,
pubsub-publisher
.Fai clic su Elimina e poi segui le istruzioni.
Terraform
- Assicurati che lo script Terraform non contenga voci per risorse ancora richieste dal tuo progetto. Ad esempio, potresti voler mantenere alcune API abilitate e le autorizzazioni IAM ancora assegnate (se hai aggiunto queste definizioni allo script Terraform).
- Esegui
terraform destroy
. - Elimina manualmente il bucket dell'ambiente. Cloud Composer non lo elimina automaticamente. Puoi farlo dalla console Google Cloud o da Google Cloud CLI.
Passaggi successivi
- Test dei DAG
- Test delle funzioni HTTP
- Esegui il deployment di una funzione Cloud Run
- Prova le altre funzionalità di Google Cloud . Dai un'occhiata ai nostri tutorial.