Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa pagina illustra la creazione di un'architettura di push basata su eventi attivazione dei DAG di Cloud Composer in risposta a Pub/Sub modifiche all'argomento. Gli esempi in questo tutorial mostrano la gestione dell'intero ciclo della gestione di Pub/Sub, inclusa la gestione degli abbonamenti, nell'ambito del processo DAG. È adatto per alcuni dei casi d'uso comuni in cui devono attivare i DAG, ma non vogliono configurare autorizzazioni di accesso aggiuntive.
Ad esempio, i messaggi inviati tramite Pub/Sub possono essere usati come soluzione se non vuoi fornire l'accesso diretto a Cloud Composer per motivi di sicurezza. Puoi configurare la funzione Cloud Function che crea messaggi Pub/Sub le pubblica su un argomento Pub/Sub. Puoi quindi creare un DAG esegue il pull dei messaggi Pub/Sub e li gestisce.
In questo esempio specifico, crei una Cloud Function ed esegui il deployment due DAG. Il primo DAG esegue il pull dei messaggi Pub/Sub e attiva il secondo DAG in base al contenuto del messaggio Pub/Sub.
Questo tutorial presuppone che tu abbia familiarità con Python e con la console Google Cloud.
Obiettivi
Costi
Questo tutorial utilizza i seguenti componenti fatturabili di Google Cloud:
- Cloud Composer (vedi anche costi aggiuntivi)
- Pub/Sub
- Cloud Functions
Dopo aver completato questo tutorial, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori dettagli, consulta Pulizia.
Prima di iniziare
Per questo tutorial è necessario un account Google Cloud progetto. Configura il progetto nel seguente modo:
Nella console Google Cloud, seleziona o crea un progetto:
Verifica che la fatturazione sia attivata per il tuo progetto. Scopri come verificare se la fatturazione è abilitata per un progetto.
Assicurati che l'utente del progetto Google Cloud disponga dei seguenti ruoli per creare le risorse necessarie:
- Utente account di servizio (
roles/iam.serviceAccountUser
) - Editor Pub/Sub (
roles/pubsub.editor
) - Amministratore oggetti di ambiente e archiviazione
(
roles/composer.environmentAndStorageObjectAdmin
) - Amministratore Cloud Functions (
roles/cloudfunctions.admin
) - Visualizzatore log (
roles/logging.viewer
)
- Utente account di servizio (
Assicurati che account di servizio che esegue la funzione Cloud Functions disponga di autorizzazioni sufficienti nel progetto per accedere a Pub/Sub. Di per impostazione predefinita, Cloud Functions utilizza Account di servizio predefinito di App Engine. Questo account di servizio ha il ruolo Editor, che dispone di una autorizzazioni per questo tutorial.
Abilita le API per il progetto
Console
Abilita le API Cloud Composer, Cloud Functions, and Pub/Sub.
gcloud
Abilita le API Cloud Composer, Cloud Functions, and Pub/Sub.
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Abilita l'API Cloud Composer nel tuo progetto aggiungendo quanto segue le definizioni delle risorse per lo script Terraform:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Sostituisci <PROJECT_ID>
con l'ID progetto
del tuo progetto. Ad esempio: example-project
.
crea il tuo ambiente Cloud Composer
Crea un ambiente Cloud Composer 2.
Nell'ambito di questa procedura,
devi concedere l'estensione agente di servizio API Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) all'agente di servizio Composer
. Cloud Composer utilizza questo account per eseguire operazioni
nel tuo progetto Google Cloud.
crea un argomento Pub/Sub
Questo esempio attiva un DAG in risposta a un messaggio inviato tramite push a un Pub/Sub. Crea un argomento Pub/Sub da utilizzare in questo esempio:
Console
Nella console Google Cloud, vai alla pagina Argomenti Pub/Sub.
Fai clic su Crea argomento.
Nel campo ID argomento, inserisci
dag-topic-trigger
come ID per per ogni argomento.Non modificare le altre impostazioni predefinite.
Fai clic su Crea argomento.
gcloud
Per creare un argomento, esegui gcloud pubsub topics create in Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Aggiungi le seguenti definizioni di risorse allo script Terraform:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Sostituisci <PROJECT_ID>
con l'ID progetto
del tuo progetto. Ad esempio: example-project
.
Carica i DAG
Carica i DAG nel tuo ambiente:
- Salva il seguente file DAG sul tuo computer locale.
- Sostituisci
<PROJECT_ID>
con l'ID progetto del tuo progetto. Ad esempio:example-project
. - Carica il file DAG modificato nel tuo ambiente.
Il codice campione contiene due DAG: trigger_dag
e target_dag
.
Il DAG trigger_dag
sottoscrive un argomento Pub/Sub, esegue il pull
di messaggi Pub/Sub e attiva un altro DAG specificato nell'ID DAG
dei dati dei messaggi Pub/Sub. In questo esempio, trigger_dag
trigger
il DAG target_dag
, che invia messaggi ai log delle attività.
Il DAG trigger_dag
contiene le seguenti attività:
subscribe_task
: iscriviti a un argomento Pub/Sub.pull_messages_operator
: lettura dei dati di un messaggio Pub/Sub conPubSubPullOperator
.trigger_target_dag
: attiva un altro DAG (in questo esempio,target_dag
) in base ai dati nei messaggi di cui è stato eseguito il pull da Pub/Sub per ogni argomento.
Il DAG target_dag
contiene una sola attività: output_to_logs
. Questa attività
stampa i messaggi nel log delle attività con un secondo di ritardo.
Esegui il deployment di una Cloud Function che pubblica messaggi in un argomento Pub/Sub
In questa sezione eseguirai il deployment di una Cloud Function che pubblica messaggi in un argomento Pub/Sub.
crea una Cloud Function e specificane la configurazione
Console
Nella console Google Cloud, vai alla pagina Cloud Functions.
Fai clic su Crea funzione.
Nel campo Ambiente, seleziona 1a generazione.
Nel campo Nome funzione, inserisci il nome della funzione:
pubsub-publisher
.Nel campo Tipo di trigger, seleziona HTTP.
Nella sezione Autenticazione, seleziona Consenti chiamate non autenticate. Questa opzione concede utenti non autenticati di richiamare una funzione HTTP.
Fai clic su Salva.
Fai clic su Avanti per andare al passaggio Codice.
Terraform
Valuta la possibilità di utilizzare la console Google Cloud per questo passaggio, poiché non sono un modo semplice per gestire il codice sorgente della funzione da Terraform.
Questo esempio mostra come caricare una Cloud Function da un file di archivio ZIP locale creando un bucket Cloud Storage, per archiviare il file in questo bucket, utilizzandolo nel bucket come per la funzione Cloud Function. Se utilizzi questo approccio, Terraform non aggiorna automaticamente il codice sorgente della funzione, anche se crei un nuovo file di archivio. Per ricaricare il codice della funzione, devi modificare il nome file dell'archivio.
- Scarica il
pubsub_publisher.py
e airequirements.txt
. - Nel file
pubsub_publisher.py
, sostituisci<PROJECT_ID>
con il ID del progetto. Per ad esempioexample-project
. - Crea un archivio ZIP denominato
pubsub_function.zip
conpbusub_publisner.py
e il filerequirements.txt
. - Salva l'archivio ZIP in una directory in cui è archiviato lo script Terraform.
- Aggiungi le seguenti definizioni di risorse allo script Terraform e
sostituisci
<PROJECT_ID>
con l'ID del progetto.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Specifica i parametri del codice della funzione Cloud Functions
Console
Nel passaggio Codice, seleziona la lingua nel campo Runtime il runtime utilizzato dalla funzione. In questo esempio, seleziona Python 3.10.
Nel campo Punto di ingresso, inserisci
pubsub_publisher
. Questo è il codice eseguito durante l'esecuzione della Cloud Function. Il valore di questo flag deve essere il nome di una funzione o un nome completo di classe che nel codice sorgente.
Terraform
Ignora questo passaggio. I parametri della Cloud Function sono già definiti in
la risorsa google_cloudfunctions_function
.
Carica il codice della Cloud Function
Console
Nel campo Codice sorgente, seleziona l'opzione appropriata per fornisci il codice sorgente della funzione. In questo tutorial, aggiungi il codice della tua funzione utilizzando le funzioni Cloud Functions Editor incorporato. In alternativa, puoi caricare un file ZIP o utilizzare Cloud Source Repositories.
- Inserisci il seguente esempio di codice nel file main.py.
- Sostituisci
<PROJECT_ID>
con l'ID progetto del tuo progetto. Ad esempio:example-project
.
Terraform
Ignora questo passaggio. I parametri della Cloud Function sono già definiti in
la risorsa google_cloudfunctions_function
.
Specifica le dipendenze della Cloud Function
Console
Specifica le dipendenze della funzione nel file di metadati requirements.txt:
Quando esegui il deployment della funzione, Cloud Functions scarica e installa
delle dipendenze dichiarate nel file requirements.txt, una riga per pacchetto.
Questo file deve trovarsi nella stessa directory del file main.py che contiene
il codice della funzione. Per ulteriori dettagli, vedi
File dei requisiti
nella documentazione di pip
.
Terraform
Ignora questo passaggio. Le dipendenze della Cloud Function sono definite
il file requirements.txt
nell'archivio pubsub_function.zip
.
Esegui il deployment della Cloud Function
Console
Fai clic su Esegui il deployment. Al termine del deployment, la funzione viene visualizzata con un segno di spunta verde nella pagina Cloud Functions in nella console Google Cloud.
Assicurati che l'account di servizio che esegue la Cloud Function disponi di autorizzazioni sufficienti nel tuo progetto per accedere in Pub/Sub.
Terraform
Inizializza Terraform:
terraform init
Rivedi la configurazione e verifica che le risorse di Terraform che crei o aggiorni in base alle tue aspettative:
terraform plan
Per verificare se la configurazione è valida, esegui questo comando: :
terraform validate
Applica la configurazione Terraform eseguendo il comando seguente inserendo sì nel prompt:
terraform apply
Attendi finché Terraform non visualizzi il messaggio "Applicazione completata!". per creare un nuovo messaggio email.
Nella console Google Cloud, accedi alle risorse nella UI per eseguire che Terraform li abbia creati o aggiornati.
Testa la Cloud Function
per verificare che la funzione pubblichi un messaggio in un argomento Pub/Sub. e che i DAG di esempio funzionino come previsto:
Controlla che i DAG siano attivi:
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda DAG.
Verifica i valori nella colonna Stato per i DAG denominati
trigger_dag
etarget_dag
. Entrambi i DAG devono essere nello statoActive
.
Esegui il push di un messaggio Pub/Sub di prova. Puoi farlo in Cloud Shell:
Nella console Google Cloud, vai alla pagina Funzioni.
Fai clic sul nome della funzione,
pubsub-publisher
.Vai alla scheda Test.
Nella sezione Configura evento di attivazione, inserisci quanto segue. Chiave-valore JSON:
{"message": "target_dag"}
. Non modificare la coppia chiave-valore. perché questo messaggio attiva il DAG di test in un secondo momento.Nella sezione Comando di test, fai clic su Testa in Cloud Shell.
Nel terminale Cloud Shell, attendi che venga visualizzato un comando automaticamente. Esegui questo comando premendo
Enter
.Se viene visualizzato il messaggio Autorizza Cloud Shell, fai clic su Autorizza.
Verifica che i contenuti del messaggio corrispondano a Pub/Sub per creare un nuovo messaggio email. In questo esempio, il messaggio di output deve iniziare con
Message b'target_dag' with message_length 10 published to
come dalla tua funzione.
Verifica che
target_dag
sia stato attivato:Attendi almeno un minuto, in modo che una nuova esecuzione del DAG di
trigger_dag
vengono completate.Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul nome dell'ambiente. Si apre la pagina Dettagli ambiente.
Vai alla scheda DAG.
Fai clic su
trigger_dag
per passare alla pagina Dettagli DAG. Nella scheda Corsa , viene visualizzato un elenco delle esecuzioni di DAG per il DAGtrigger_dag
.Questo DAG viene eseguito ogni minuto ed elabora tutti i Pub/Sub di messaggi inviati dalla funzione. Se non è stato inviato nessun messaggio, L'attività
trigger_target
è contrassegnata comeSkipped
nei log di esecuzione dei DAG. Se I DAG sono stati attivati, quindi l'attivitàtrigger_target
è contrassegnata comeSuccess
.Esamina diverse esecuzioni recenti di DAG per individuare un'esecuzione di DAG in cui tre attività (
subscribe_task
,pull_messages_operator
etrigger_target
) sono in statoSuccess
.Torna alla scheda DAG e controlla che lo stato Esecuzioni riuscite per il DAG
target_dag
elenca un'esecuzione riuscita.
Riepilogo
In questo tutorial hai imparato a utilizzare Cloud Functions per pubblicare in un argomento Pub/Sub ed eseguire il deployment di un DAG che sottoscrive una argomento Pub/Sub, esegue il pull di messaggi Pub/Sub e trigger a un altro DAG specificato nell'ID DAG dei dati dei messaggi.
Esistono anche metodi alternativi per creazione e gestione di sottoscrizioni Pub/Sub e l'attivazione di DAG esterni nell'ambito di questo tutorial. Ad esempio, puoi usa Cloud Functions per attivare i DAG Airflow quando si verifica un evento specificato. Dai un'occhiata ai nostri tutorial per provare gli altri le funzionalità di Google Cloud per te.
Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse usati in questo tutorial, elimina il progetto che contiene le risorse o mantenere il progetto ed eliminare le singole risorse.
Elimina il progetto
Elimina un progetto Google Cloud:
gcloud projects delete PROJECT_ID
Elimina singole risorse
Se prevedi di esplorare più tutorial e guide rapide, puoi riutilizzare i progetti può aiutarti a evitare di superare i limiti di quota.
Console
- Eliminare l'ambiente Cloud Composer. Inoltre elimina il bucket dell'ambiente durante questa procedura.
- Elimina l'argomento Pub/Sub,
dag-topic-trigger
. Elimina la Cloud Function.
Nella console Google Cloud, vai a Cloud Functions.
Fai clic sulla casella di controllo della funzione da eliminare,
pubsub-publisher
.Fai clic su Elimina e segui le istruzioni.
Terraform
- Assicurati che il tuo script Terraform non contenga voci per le risorse ancora richieste dal tuo progetto. Ad esempio, potrebbe voler mantenere alcune API abilitate e IAM ancora assegnate (se hai aggiunto queste definizioni script Terraform).
- Esegui
terraform destroy
. - Eliminare manualmente il bucket dell'ambiente. Cloud Composer non la elimina automaticamente. Puoi farlo da nella console Google Cloud o in Google Cloud CLI.
Passaggi successivi
- Test dei DAG
- Test delle funzioni HTTP
- Esegui il deployment di una funzione Cloud Functions
- Prova le altre funzionalità di Google Cloud. Consulta questi dati: i nostri tutorial.