Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questo tutorial è una modifica di Eseguire un DAG di analisi dei dati in Google Cloud che mostra come collegare l'ambiente Cloud Composer ad Amazon Web Services per utilizzare i dati memorizzati al suo interno. Mostra come utilizzare Cloud Composer per creare un DAG di Apache Airflow. Il DAG unisce i dati di un set di dati pubblico BigQuery e di un file CSV memorizzato in un bucket S3 di Amazon Web Services (AWS), quindi esegue un job batch Dataproc Serverless per elaborare i dati uniti.
Il set di dati pubblico BigQuery in questo tutorial è ghcn_d, un database integrato di riepiloghi climatici il globo. Il file CSV contiene informazioni sulle date e i nomi delle festività statunitensi dal 1997 al 2021.
La domanda a cui vogliamo rispondere utilizzando il DAG è: "Che temperatura faceva a Chicago in occasione del Giorno del Ringraziamento negli ultimi 25 anni?"
Obiettivi
- Crea un ambiente Cloud Composer nella configurazione predefinita
- Crea un bucket in AWS S3
- crea un set di dati BigQuery vuoto
- Creare un nuovo bucket Cloud Storage
- Crea ed esegui un DAG che includa le seguenti attività:
- Carica un set di dati esterno da S3 a Cloud Storage
- carica un set di dati esterno da Cloud Storage BigQuery
- Unisci due set di dati in BigQuery
- Esegui un job PySpark di analisi dei dati
Prima di iniziare
Gestire le autorizzazioni in AWS
Segui la sezione "Creazione di criteri con l'editor visivo" del tutorial sulla creazione di criteri IAM AWS per creare un criterio IAM personalizzato per AWS S3 con la seguente configurazione:
- Servizio:S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
), per visualizzare il bucket S3 - CreateBucket (
s3:CreateBucket
), per creare un bucket - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
), per creare un bucket - ListBucket (
s3:ListBucket
), per concedere l'autorizzazione a elencare gli oggetti in un bucket S3 - PutObject (
s3:PutObject
), per caricare file in un bucket - GetBucketVersioning (
s3:GetBucketVersioning
), per eliminare un oggetto in un bucket - DeleteObject (
s3:DeleteObject
), per eliminare un oggetto in un bucket - ListBucketVersions (
s3:ListBucketVersions
), per l'eliminazione di un bucket - DeleteBucket (
s3:DeleteBucket
), per eliminare un bucket - Risorse: scegli "Qualsiasi" accanto a "bucket" e "oggetto" per concedere le autorizzazioni a qualsiasi risorsa di quel tipo.
- Tag: nessuno
- Nome: TutorialPolicy
Consulta l'elenco delle azioni supportate in Amazon S3 per ulteriori informazioni su ciascuna configurazione trovata sopra.
Abilita API
Abilita le seguenti API:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Concedi le autorizzazioni
Concedi i ruoli e le autorizzazioni seguenti al tuo account utente:
Concedere i ruoli per la gestione degli ambienti e dei bucket di ambiente di Cloud Composer.
Concedi il ruolo BigQuery Data Owner (
roles/bigquery.dataOwner
) per creare un set di dati BigQuery.Concedi il ruolo Amministratore Storage (
roles/storage.admin
) per creare un bucket Cloud Storage.
Crea e prepara l'ambiente Cloud Composer
Crea un ambiente Cloud Composer con impostazione predefinita parametri:
- Scegli una regione con sede negli Stati Uniti.
- Scegli la versione Cloud Composer più recente.
Concedi i seguenti ruoli all'account di servizio utilizzato in dell'ambiente Cloud Composer affinché i worker di Airflow possano eseguire correttamente le attività DAG:
- Utente BigQuery (
roles/bigquery.user
) - Proprietario dati BigQuery (
roles/bigquery.dataOwner
) - Utente account di servizio (
roles/iam.serviceAccountUser
) - Editor Dataproc (
roles/dataproc.editor
) - Worker Dataproc (
roles/dataproc.worker
)
- Utente BigQuery (
Creare e modificare le risorse correlate in Google Cloud
Installa
apache-airflow-providers-amazon
pacchetto PyPI nel tuo nell'ambiente Cloud Composer.Crea un set di dati BigQuery vuoto con i seguenti parametri:
- Nome:
holiday_weather
- Regione:
US
- Nome:
Crea un nuovo bucket Cloud Storage nell'area multiregionale
US
.Esegui il seguente comando per abilitare l'accesso privato Google nella subnet predefinita della regione in cui vuoi eseguire Dataproc Serverless per soddisfare i requisiti di rete. Me di utilizzare la stessa regione di Cloud Composer completamente gestito di Google Cloud.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Creare risorse correlate in AWS
Crea un bucket S3 con le impostazioni predefinite nella tua regione preferita.
Connettiti ad AWS da Cloud Composer
- Ottieni l'ID chiave di accesso e la chiave di accesso segreta AWS
Aggiungi la connessione AWS S3 utilizzando l'interfaccia utente di Airflow:
- Vai ad Amministrazione > Connessioni.
Crea una nuova connessione con la seguente configurazione:
- ID connessione:
aws_s3_connection
- Tipo di connessione:
Amazon S3
- Extra:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- ID connessione:
Elaborazione dei dati utilizzando Dataproc Serverless
Esplora il job PySpark di esempio
Il codice riportato di seguito è un esempio di job PySpark che converte la temperatura da décimos de grado Celsius a gradi Celsius. Questo job converte di temperatura del set di dati in un formato diverso.
Carica il file PySpark in Cloud Storage
Per caricare il file PySpark in Cloud Storage:
Salva data_analytics_process.py al tuo computer locale.
Nella console Google Cloud, vai alla pagina Browser Cloud Storage:
Fai clic sul nome del bucket creato in precedenza.
Nella scheda Oggetti del bucket, fai clic sul pulsante Carica file, selezionate
data_analytics_process.py
nella finestra di dialogo visualizzata e fai clic su Apri.
Carica il file CSV su AWS S3
Per caricare il file holidays.csv
:
- Salva
holidays.csv
sulla tua macchina locale. - Segui le Guida di AWS per caricare il file nel bucket.
DAG di analisi dei dati
Esplora il DAG di esempio
Il DAG utilizza più operatori per trasformare e unificare i dati:
La
S3ToGCSOperator
trasferisce il file holidays.csv dal tuo dal bucket AWS S3 al bucket Cloud Storage.GCSToBigQueryOperator
importa il file holidays.csv da Cloud Storage in una nuova tabella del set di dati BigQueryholidays_weather
che hai creato in precedenza.DataprocCreateBatchOperator
crea ed esegue un job batch PySpark utilizzando Dataproc Serverless.La
BigQueryInsertJobOperator
unisce i dati di holidays.csv nella "Data" colonna con i dati meteorologici del set di dati pubblico di BigQuery ghcn_d. Le attivitàBigQueryInsertJobOperator
vengono generate dinamicamente utilizzando un ciclo for e si trovano in unTaskGroup
per una migliore leggibilità nella visualizzazione del grafico dell'interfaccia utente di Airflow.
Utilizzare l'interfaccia utente di Airflow per aggiungere variabili
In Airflow, le variabili sono un modo universale per archiviare e recuperare impostazioni o configurazioni arbitrarie come un semplice archivio di chiavi e valori. Questo DAG utilizza le variabili Airflow per memorizzare i valori comuni. Per aggiungerli al tuo ambiente:
Accedi all'interfaccia utente di Airflow dalla console Cloud Composer.
Vai ad Amministrazione > Variabili.
Aggiungi le seguenti variabili:
s3_bucket
: il nome del bucket S3 che hai creato in precedenza.gcp_project
: il tuo ID progetto.gcs_bucket
: il nome del bucket che hai creato in precedenza (senza il prefissogs://
).gce_region
: la regione in cui vuoi che il tuo job Dataproc soddisfi i requisiti di rete di Dataproc Serverless. Si tratta della regione in cui hai attivato l'accesso privato Google in precedenza.dataproc_service_account
: l'account di servizio per nell'ambiente Cloud Composer. Puoi trovare questo account servizio nella scheda di configurazione dell'ambiente per il tuo ambiente Cloud Composer.
Carica il DAG nel bucket del tuo ambiente
Cloud Composer pianifica i DAG che si trovano
/dags
cartella nel bucket dell'ambiente. Per caricare il DAG utilizzando
Console Google Cloud:
Sul computer locale, risparmia s3togcsoperator_tutorial.py.
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, nella colonna Cartella DAG, fai clic sul link DAG. Si apre la cartella dei DAG del tuo ambiente.
Fai clic su Carica file.
Seleziona
s3togcsoperator_tutorial.py
sulla macchina locale e fai clic Apri.
Attiva il DAG
Nell'ambiente Cloud Composer, fai clic sulla scheda DAG.
Fai clic sull'ID DAG
s3_to_gcs_dag
.Fai clic su Attiva DAG.
Attendi circa 5-10 minuti finché non viene visualizzato un segno di spunta verde che indica sono state completate correttamente.
Convalida l'esito del DAG
Nella console Google Cloud, vai alla pagina BigQuery.
Nel riquadro Explorer, fai clic sul nome del tuo progetto.
Fai clic su
holidays_weather_joined
.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna valore sono in decimi di grado Celsius.
Fai clic su
holidays_weather_normalized
.Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella sono in gradi Celsius.
Esegui la pulizia
Elimina le singole risorse che hai creato per questo tutorial:
Elimina il file
holidays.csv
nel tuo bucket AWS S3.Elimina il bucket AWS S3 che hai creato.
Elimina il bucket Cloud Storage che hai per questo tutorial.
Eliminare l'ambiente Cloud Composer, ad esempio manualmente il bucket dell'ambiente.