Cloud Composer 1 | Cloud Composer 2
Questo tutorial è una modifica di Esegui un DAG di analisi dei dati in Google Cloud che mostra come connettere il tuo ambiente Cloud Composer a Microsoft Azure per utilizzare i dati archiviati. Mostra come utilizzare Cloud Composer per creare un DAG Apache Airflow. Il DAG unisce i dati di un set di dati pubblico BigQuery e di un file CSV archiviato in un Archiviazione BLOB di Azure, quindi esegue un job batch Dataproc Serverless per elaborare i dati uniti.
Il set di dati pubblico BigQuery in questo tutorial è ghcn_d, un database integrato di riepiloghi climatici in tutto il mondo. Il file CSV contiene informazioni sulle date e i nomi delle festività degli Stati Uniti dal 1997 al 2021.
La domanda a cui vogliamo rispondere utilizzando il DAG è: "Che caldo ha fatto a Chicago il Giorno del Ringraziamento negli ultimi 25 anni?"
Obiettivi
- Crea un ambiente Cloud Composer nella configurazione predefinita
- Crea un blob in Azure
- crea un set di dati BigQuery vuoto
- Crea un nuovo bucket Cloud Storage
- Crea ed esegui un DAG che includa le seguenti attività:
- Carica un set di dati esterno da Azure Blob Storage a Cloud Storage
- Carica un set di dati esterno da Cloud Storage a BigQuery
- Unire due set di dati in BigQuery
- Esegui un job PySpark di analisi dei dati
Prima di iniziare
Abilita le API
Abilita le seguenti API:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Concedi le autorizzazioni
Concedi i ruoli e le autorizzazioni seguenti al tuo account utente:
Concedi i ruoli per la gestione degli ambienti e dei bucket di ambienti Cloud Composer.
Concedi il ruolo Proprietario dati BigQuery (
roles/bigquery.dataOwner
) per creare un set di dati BigQuery.Concedi il ruolo Amministratore Storage (
roles/storage.admin
) per creare un bucket Cloud Storage.
Crea e prepara il tuo ambiente Cloud Composer
Crea un ambiente Cloud Composer con parametri predefiniti:
- Scegli una regione con sede negli Stati Uniti.
- Scegli la versione più recente di Cloud Composer.
Concedi i ruoli seguenti all'account di servizio utilizzato nel tuo ambiente Cloud Composer affinché i worker di Airflow possano eseguire correttamente le attività DAG:
- Utente BigQuery (
roles/bigquery.user
) - Proprietario dati BigQuery (
roles/bigquery.dataOwner
) - Utente account di servizio (
roles/iam.serviceAccountUser
) - Editor Dataproc (
roles/dataproc.editor
) - Worker Dataproc (
roles/dataproc.worker
)
- Utente BigQuery (
Creare e modificare le risorse correlate in Google Cloud
Installa il pacchetto PyPI
apache-airflow-providers-microsoft-azure
nel tuo ambiente Cloud Composer.Crea un set di dati BigQuery vuoto con i seguenti parametri:
- Nome:
holiday_weather
- Regione:
US
- Nome:
Crea un nuovo bucket Cloud Storage nella località multiregionale
US
.Esegui questo comando per abilitare l'accesso privato Google nella subnet predefinita nella regione in cui vuoi eseguire Dataproc Serverless per soddisfare i requisiti di rete. Ti consigliamo di utilizzare la stessa regione del tuo ambiente Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Crea risorse correlate in Azure
Crea un account di archiviazione con le impostazioni predefinite.
Recupera la chiave di accesso e la stringa di connessione per il tuo account di archiviazione.
Crea un container con le opzioni predefinite nell'account di archiviazione appena creato.
Concedi il ruolo di Delegatore BLOB Storage per il container creato nel passaggio precedente.
Carica holidays.csv per creare un BLOB a blocchi con le opzioni predefinite nel portale Azure.
Crea un token SAS per il BLOB a blocchi che hai creato nel passaggio precedente nel portale Azure.
- Metodo di firma: chiave di delega utente
- Autorizzazioni: leggi
- Indirizzo IP consentito: nessuno
- Protocolli consentiti: solo HTTPS
Connettiti ad Azure da Cloud Composer
Aggiungi la connessione di Microsoft Azure utilizzando l'interfaccia utente di Airflow:
Vai ad Amministrazione > Connessioni.
Crea una nuova connessione con la seguente configurazione:
- ID connessione:
azure_blob_connection
- Tipo di connessione:
Azure Blob Storage
- Accesso allo spazio di archiviazione BLOB:il nome dell'account di archiviazione
- Chiave di archiviazione BLOB:la chiave di accesso per l'account di archiviazione.
- Stringa di connessione dell'account di archiviazione BLOB: la stringa di connessione dell'account di archiviazione.
- Token SAS:il token SAS generato dal BLOB
- ID connessione:
Elaborazione dati con Dataproc Serverless
Esplora il job PySpark di esempio
Il codice mostrato di seguito è un esempio di job PySpark che converte la temperatura da decimi di grado in Celsius a gradi Celsius. Questo job converte i dati sulla temperatura dal set di dati in un formato diverso.
Carica il file PySpark su Cloud Storage
Per caricare il file PySpark in Cloud Storage:
Salva data_analytics_process.py sulla tua macchina locale.
Nella console Google Cloud, vai alla pagina Browser Cloud Storage:
Fai clic sul nome del bucket che hai creato in precedenza.
Nella scheda Oggetti del bucket, fai clic sul pulsante Carica file, seleziona
data_analytics_process.py
nella finestra di dialogo visualizzata e fai clic su Apri.
DAG di analisi dei dati
Esplora il DAG di esempio
Il DAG utilizza più operatori per trasformare e unificare i dati:
Il
AzureBlobStorageToGCSOperator
trasferisce il file holidays.csv dal BLOB a blocchi Azure al bucket Cloud Storage.GCSToBigQueryOperator
importa il file holidays.csv da Cloud Storage in una nuova tabella nel set di dati BigQueryholidays_weather
che hai creato in precedenza.DataprocCreateBatchOperator
crea ed esegue un job batch PySpark utilizzando Dataproc Serverless.BigQueryInsertJobOperator
unisce i dati di holidays.csv nella colonna "Data" ai dati meteo del set di dati pubblico BigQuery ghcn_d. Le attivitàBigQueryInsertJobOperator
vengono generate dinamicamente utilizzando un ciclo for e si trovano in unTaskGroup
per una migliore leggibilità nella visualizzazione del grafico dell'interfaccia utente di Airflow.
Utilizzare l'interfaccia utente di Airflow per aggiungere variabili
In Airflow, le variabili offrono un modo universale per archiviare e recuperare impostazioni o configurazioni arbitrarie come un semplice archivio chiave-valore. Questo DAG utilizza le variabili Airflow per archiviare i valori comuni. Per aggiungerli al tuo ambiente:
Vai ad Amministrazione > Variabili.
Aggiungi le seguenti variabili:
gcp_project
: il tuo ID progetto.gcs_bucket
: il nome del bucket che hai creato in precedenza (senza il prefissogs://
).gce_region
: la regione in cui vuoi che il job Dataproc soddisfi i requisiti di networking di Dataproc Serverless. Questa è la regione in cui hai attivato l'accesso privato Google in precedenza.dataproc_service_account
: l'account di servizio per il tuo ambiente Cloud Composer. Puoi trovare questo account di servizio nella scheda di configurazione dell'ambiente per il tuo ambiente Cloud Composer.azure_blob_name
: il nome del BLOB che hai creato in precedenza.azure_container_name
: il nome del contenitore creato in precedenza.
Carica il DAG nel bucket del tuo ambiente
Cloud Composer pianifica i DAG che si trovano nella
cartella /dags
nel bucket del tuo ambiente. Per caricare il DAG utilizzando la console Google Cloud:
Sulla macchina locale, salva azureblobstoretogcsoperator_tutorial.py.
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, nella colonna Cartella DAG, fai clic sul link DAG. Si apre la cartella DAG del tuo ambiente.
Fai clic su Carica file.
Seleziona
azureblobstoretogcsoperator_tutorial.py
sulla macchina locale e fai clic su Apri.
Attiva il DAG
Nel tuo ambiente Cloud Composer, fai clic sulla scheda DAG.
Fai clic sull'ID DAG
azure_blob_to_gcs_dag
.Fai clic su Attiva DAG.
Attendi circa 5-10 minuti finché non viene visualizzato un segno di spunta verde che indica che le attività sono state completate correttamente.
Convalidare il successo del DAG
Nella console Google Cloud, vai alla pagina BigQuery.
Nel riquadro Explorer, fai clic sul nome del progetto.
Fai clic su
holidays_weather_joined
.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna del valore sono in decimi di grado Celsius.
Fai clic su
holidays_weather_normalized
.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna del valore sono in gradi Celsius.
esegui la pulizia
Elimina le singole risorse che hai creato per questo tutorial:
Elimina il bucket Cloud Storage che hai creato per questo tutorial.
Elimina l'ambiente Cloud Composer, compresa l'eliminazione manuale del bucket dell'ambiente.