Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questo tutorial mostra come utilizzare Cloud Composer per creare DAG Apache Airflow. La Il DAG unisce i dati di un set di dati pubblico BigQuery e di un file CSV archiviato in un bucket Cloud Storage ed esegue Job batch serverless Dataproc per elaborare i dati uniti.
Il set di dati pubblico BigQuery in questo tutorial è ghcn_d, un database integrato di riepiloghi climatici il globo. Il file CSV contiene informazioni sulle date e i nomi delle festività statunitensi dal 1997 al 2021.
La domanda a cui vogliamo rispondere utilizzando il DAG è: "Che temperatura faceva a Chicago in occasione del Giorno del Ringraziamento negli ultimi 25 anni?"
Obiettivi
- Crea un ambiente Cloud Composer nella configurazione predefinita
- crea un set di dati BigQuery vuoto
- Creare un nuovo bucket Cloud Storage
- Crea ed esegui un DAG che includa le seguenti attività:
- Carica un set di dati esterno da Cloud Storage in BigQuery
- Unire due set di dati in BigQuery
- Esegui un job PySpark di analisi dei dati
Prima di iniziare
Abilita API
Abilita le seguenti API:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Concedi le autorizzazioni
Concedi i ruoli e le autorizzazioni seguenti al tuo account utente:
Concedi ruoli per la gestione degli ambienti e dei bucket di ambienti Cloud Composer.
Concedi il ruolo BigQuery Data Owner (
roles/bigquery.dataOwner
) per creare un set di dati BigQuery.Concedi il ruolo Storage Admin (
roles/storage.admin
) per creare un bucket Cloud Storage.
Crea e prepara il tuo ambiente Cloud Composer
Crea un ambiente Cloud Composer con i parametri predefiniti:
- Scegli una regione con sede negli Stati Uniti.
- Scegli la versione più recente di Cloud Composer.
Concedi i seguenti ruoli all'account di servizio utilizzato nel tuo ambiente Cloud Composer affinché i worker Airflow possano eseguire correttamente le attività DAG:
- Utente BigQuery (
roles/bigquery.user
) - Proprietario dei dati BigQuery (
roles/bigquery.dataOwner
) - Utente account di servizio (
roles/iam.serviceAccountUser
) - Editor Dataproc (
roles/dataproc.editor
) - Worker Dataproc (
roles/dataproc.worker
)
- Utente BigQuery (
Creare risorse correlate
Crea un set di dati BigQuery vuoto con i seguenti parametri:
- Nome:
holiday_weather
- Regione:
US
- Nome:
Crea un nuovo bucket Cloud Storage nella regione multipla
US
.Esegui il seguente comando per abilitare l'accesso privato Google nella subnet predefinita della regione in cui vuoi eseguire Dataproc Serverless per soddisfare i requisiti di rete. Ti consigliamo di utilizzare la stessa regione del tuo ambiente Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Elaborazione dati con Dataproc Serverless
Esplora il job PySpark di esempio
Il codice riportato di seguito è un esempio di job PySpark che converte la temperatura da décimos de grado Celsius a gradi Celsius. Questo job converte di temperatura del set di dati in un formato diverso.
Carica i file di supporto su Cloud Storage
Per caricare il file PySpark e il set di dati archiviato in holidays.csv
:
Salva data_analytics_process.py sulla tua macchina locale.
Salva holidays.csv sul computer locale.
Nella console Google Cloud, vai alla pagina Browser Cloud Storage:
Fai clic sul nome del bucket creato in precedenza.
Nella scheda Oggetti del bucket, fai clic sul pulsante Carica file, seleziona
data_analytics_process.py
eholidays.csv
nella finestra di dialogo visualizzata e fai clic su Apri.
DAG di analisi dei dati
Esplora il DAG di esempio
Il DAG utilizza più operatori per trasformare e unificare i dati:
La
GCSToBigQueryOperator
importa il file holidays.csv da Cloud Storage in una nuova tabella in BigQueryholidays_weather
set di dati che hai creato in precedenza.DataprocCreateBatchOperator
crea ed esegue un job batch PySpark utilizzando Dataproc Serverless.La
BigQueryInsertJobOperator
unisce i dati di holidays.csv nella "Data" colonna con i dati meteorologici del set di dati pubblico di BigQuery ghcn_d. Le attività diBigQueryInsertJobOperator
sono generate dinamicamente utilizzando un ciclo for e queste attivitàTaskGroup
per una migliore leggibilità nella vista grafico della UI di Airflow.
Utilizzare l'UI di Airflow per aggiungere variabili
In Airflow, variables rappresentano un metodo universale per memorizzare e recuperare impostazioni arbitrarie o un semplice archivio di coppie chiave-valore. Questo DAG utilizza le variabili Airflow per memorizzare i valori comuni. Per aggiungerle al tuo ambiente:
Accedi all'interfaccia utente di Airflow dalla console Cloud Composer.
Vai ad Amministrazione > Variabili.
Aggiungi le seguenti variabili:
gcp_project
: il tuo ID progetto.gcs_bucket
: il nome del bucket che hai creato in precedenza (senza il prefissogs://
).gce_region
: la regione in cui vuoi che il tuo job Dataproc soddisfi i requisiti di rete di Dataproc Serverless. Si tratta della regione in cui hai attivato l'accesso privato Google in precedenza.dataproc_service_account
: l'account di servizio per il tuo ambiente Cloud Composer. Puoi trovare questo servizio nella scheda di configurazione dell'ambiente nell'ambiente Cloud Composer.
Carica il DAG nel bucket del tuo ambiente
Cloud Composer pianifica i DAG che si trovano
/dags
cartella nel bucket dell'ambiente. Per caricare il DAG utilizzando la console Google Cloud:
Sul computer locale, risparmia data_analytics_dag.py.
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, nella colonna Cartella DAG, fai clic sul link DAG. Viene aperta la cartella DAG del tuo ambiente.
Fai clic su Carica file.
Seleziona
data_analytics_dag.py
sulla macchina locale e fai clic su Apri.
Attiva il DAG
Nell'ambiente Cloud Composer, fai clic sulla scheda DAG.
Fai clic sull'ID DAG
data_analytics_dag
.Fai clic su Attiva DAG.
Attendi circa cinque-dieci minuti finché non viene visualizzato un segno di spunta verde che indica che le attività sono state completate correttamente.
Convalida l'esito positivo del DAG
Nella console Google Cloud, vai alla pagina BigQuery.
Nel riquadro Explorer, fai clic sul nome del progetto.
Fai clic su
holidays_weather_joined
.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna valore sono in decimi di grado Celsius.
Fai clic su
holidays_weather_normalized
.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella sono in gradi Celsius.
Approfondimento con Dataproc Serverless (facoltativo)
Puoi provare una versione avanzata di questo DAG con un PySpark più complesso flusso di elaborazione dei dati. Consulta l'estensione Dataproc per l'esempio di analisi dei dati su GitHub.
Esegui la pulizia
Elimina le singole risorse che hai creato per questo tutorial:
Elimina il bucket Cloud Storage che hai creato per questo tutorial.
Eliminare l'ambiente Cloud Composer, ad esempio manualmente il bucket dell'ambiente.
Passaggi successivi
- Esegui un DAG di analisi dei dati in Google Cloud utilizzando i dati di AWS.
- Esegui un DAG di Data Analytics in Azure.