Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
In questa pagina viene descritto come utilizzare DataflowTemplateOperator
per l'avvio
Pipeline Dataflow da
con Cloud Composer.
La pipeline Testo di Cloud Storage in BigQuery è una pipeline batch che ti consente di caricare i file di testo archiviati in Cloud Storage, trasformarli utilizzando una funzione JavaScript definita dall'utente (UDF) fornita e recapitare i risultati in BigQuery.
Panoramica
Prima di avviare il flusso di lavoro, creerai le seguenti entità:
Una tabella BigQuery vuota di un set di dati vuoto che conterrà le seguenti colonne di informazioni:
location
,average_temperature
,month
e, facoltativamente,inches_of_rain
,is_current
elatest_measurement
.Un file JSON che normalizzerà i dati di
.txt
nel formato corretto per il parametro . L'oggetto JSON avrà un array diBigQuery Schema
, dove ogni oggetto conterrà il nome di una colonna, il tipo di input e se si tratta o meno di un campo obbligatorio.Un file
.txt
di input che conterrà i dati che verranno caricati collettivamente alla tabella BigQuery.Una funzione definita dall'utente scritta in JavaScript che trasformerà ciascuna riga del file
.txt
nelle variabili pertinenti per la tabella.Un file DAG Airflow che punterà alla posizione di questi .
Successivamente, caricherai il file
.txt
, il file delle funzioni definite dall'utente.js
e lo schema.json
in un bucket Cloud Storage. Caricherai inoltre il DAG nel tuo ambiente Cloud Composer.Dopo aver caricato il DAG, Airflow eseguirà un'attività. Questa attività avviare una pipeline Dataflow che applicherà funzione definita dall'utente al file
.txt
e formattarla in base alla Schema JSON.Infine, i dati verranno caricati nella tabella BigQuery che hai creato in precedenza.
Prima di iniziare
- Questa guida richiede familiarità con JavaScript per scrivere la funzione definita dall'utente.
- Questa guida presuppone che tu abbia già un ambiente Cloud Composer. Per crearne uno, consulta Creare un ambiente. Puoi utilizzare qualsiasi versione di Cloud Composer con questa guida.
-
Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.
Crea una tabella BigQuery vuota con una definizione di schema
Crea una tabella BigQuery con una definizione di schema. Utilizzerai questa definizione di schema più avanti in questa guida. Questo La tabella BigQuery conterrà i risultati del caricamento in batch.
Per creare una tabella vuota con una definizione dello schema:
Console
Nella console Google Cloud, vai a BigQuery pagina:
Nel pannello di navigazione, nella sezione Risorse, espandi le progetto.
Nel riquadro dei dettagli, fai clic su Crea set di dati.
Nella pagina Crea set di dati, nella sezione ID set di dati, assegna un nome Set di dati
average_weather
. Lascia invariati i valori predefiniti di tutti gli altri campi. stato.Fai clic su Crea set di dati.
Torna al pannello di navigazione, nella sezione Risorse ed espandi del tuo progetto. Quindi, fai clic sul set di dati
average_weather
.Nel riquadro dei dettagli, fai clic su Crea tabella.
Nella pagina Crea tabella, nella sezione Origine, seleziona Tabella vuota.
Nella sezione Destinazione della pagina Crea tabella:
Per Nome set di dati, scegli il set di dati
average_weather
.Nel campo Nome tabella, inserisci il nome
average_weather
.Verifica che l'opzione Tipo di tabella sia impostata su Tabella nativa.
Nella sezione Schema, inserisci lo schema. definizione di Kubernetes. Puoi utilizzare uno dei seguenti approcci:
Inserisci manualmente le informazioni sullo schema attivando Modifica come testo e inserendo lo schema della tabella come array JSON. Digita quanto segue campi:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Utilizza Aggiungi campo per inserire manualmente lo schema:
Per Impostazioni di partizionamento e clustering, lascia il valore predefinito
No partitioning
.Nella sezione Opzioni avanzate, lascia il valore predefinito
Google-managed key
per Crittografia.Fai clic su Crea tabella.
bq
Utilizza il comando bq mk
per creare un set di dati vuoto e una tabella in questo set di dati.
Esegui il seguente comando per creare un set di dati sul clima globale medio:
bq --location=LOCATION mk \
--dataset PROJECT_ID:average_weather
Sostituisci quanto segue:
LOCATION
: la regione in cui si trova l'ambiente.PROJECT_ID
: l'ID del progetto.
Esegui questo comando per creare una tabella vuota in questo set di dati con la definizione dello schema:
bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Una volta creata la tabella, puoi aggiornare la scadenza, la descrizione e le etichette della tabella. Puoi anche modificare la definizione dello schema.
Python
Salva questo codice come
dataflowtemplateoperator_create_dataset_and_table_helper.py
e aggiorna le variabili in modo che riflettano il progetto e la località, quindi
eseguilo con il seguente comando:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Per autenticarti a Cloud Composer, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.
Crea un bucket Cloud Storage
Crea un bucket per contenere tutti i file necessari per il flusso di lavoro. Il DAG che crei più avanti in questa guida farà riferimento ai file che carichi in questo bucket di archiviazione. Per creare un nuovo bucket di archiviazione:
Console
Apri Cloud Storage nella console Google Cloud.
Fai clic su Crea bucket per aprire il modulo di creazione del bucket.
Inserisci le informazioni sul bucket e fai clic su Continua per completare ogni passaggio:
Specifica un nome globalmente univoco per il tuo bucket. Questa guida utilizza
bucketName
come esempio.Seleziona Regione come tipo di località. Quindi, seleziona un'opzione Località in cui verranno archiviati i dati del bucket.
Seleziona Standard come classe di archiviazione predefinita per i tuoi dati.
Seleziona il controllo dell'accesso Uniforme per accedere ai tuoi oggetti.
Fai clic su Fine.
gcloud
Usa il comando gcloud storage buckets create
:
gcloud storage buckets create gs://bucketName/
Sostituisci quanto segue:
bucketName
: il nome del bucket che hai creato in precedenza in questa guida.
Esempi di codice
C#
Per autenticarti a Cloud Composer, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.
Go
Per eseguire l'autenticazione su Cloud Composer, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.
Java
Per autenticarti a Cloud Composer, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.
Python
Per autenticarti a Cloud Composer, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Ruby
Per eseguire l'autenticazione su Cloud Composer, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Crea uno schema BigQuery in formato JSON per la tabella di output
Crea un file di schema BigQuery in formato JSON che corrisponda alla tabella di output creata in precedenza. Tieni presente che i nomi, i tipi e le modalità dei campi
devono corrispondere a quelli definiti in precedenza nella tabella BigQuery
. Questo file normalizzerà i dati del file .txt
in un formato compatibile con lo schema BigQuery. Assegna un nome al file
jsonSchema.json
.
{
"BigQuery Schema": [
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC"
},
{
"name": "is_current",
"type": "BOOLEAN"
},
{
"name": "latest_measurement",
"type": "DATE"
}]
}
Creare un file JavaScript per formattare i dati
In questo file, dovrai definire la funzione UDF (User Defined Function) che fornisce la logica per trasformare le righe di testo nel file di input. Tieni presente che
prende ciascuna riga di testo nel file di input come argomento separato,
la funzione verrà eseguita una volta per ogni riga del file di input. Assegna un nome a questo file
transformCSVtoJSON.js
.
Crea il file di input
Questo file conterrà le informazioni che vuoi caricare sul tuo
Tabella BigQuery. Copia questo file localmente e assegnagli il nome
inputFile.txt
.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null
Carica i file nel bucket
Carica i seguenti file nel bucket Cloud Storage che hai creato in precedenza:
- Schema BigQuery in formato JSON (
.json
) - Funzione definita dall'utente JavaScript (
transformCSVtoJSON.js
) Il file di input del testo che vuoi elaborare (
.txt
)
Console
- Nella console Google Cloud, vai alla pagina Bucket di Cloud Storage.
Nell'elenco dei bucket, fai clic sul tuo bucket.
Nella scheda Oggetti del bucket, esegui una delle seguenti operazioni:
Trascina i file desiderati dal desktop o da Gestione file. al riquadro principale della console Google Cloud.
Fai clic sul pulsante Carica file e seleziona i file che vuoi carica nella finestra di dialogo visualizzata e fai clic su Apri.
gcloud
Esegui il comando gcloud storage cp
:
gcloud storage cp OBJECT_LOCATION gs://bucketName
Sostituisci quanto segue:
bucketName
: il nome del bucket che hai creato in precedenza in questa guida.OBJECT_LOCATION
: il percorso locale dell'oggetto. Ad esempio:Desktop/transformCSVtoJSON.js
.
Esempi di codice
Python
Per autenticarti a Cloud Composer, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Ruby
Per eseguire l'autenticazione su Cloud Composer, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.
Configura DataflowTemplateOperator
Prima di eseguire il DAG, imposta le seguenti variabili Airflow.
Variabile Airflow | Valore |
---|---|
project_id
|
L'ID del progetto |
gce_zone
|
Zona Compute Engine in cui deve essere creato il cluster Dataflow |
bucket_path
|
La località del bucket Cloud Storage che hai creato precedenti |
Ora farai riferimento ai file creati in precedenza per creare un DAG che avvia
dal flusso di lavoro di Dataflow. Copia questo DAG e salvalo localmente come composer-dataflow-dag.py
.
Carica il DAG in Cloud Storage
Carica il DAG nella cartella /dags
della cartella
di sincronizzare la directory di una VM
con un bucket. Una volta completato il caricamento, puoi visualizzarlo facendo clic sul link Cartella DAG nella pagina degli ambienti Cloud Composer.
Visualizzare lo stato dell'attività
- Vai all'interfaccia web di Airflow.
- Nella pagina DAG, fai clic sul nome del DAG (ad es.
composerDataflowDAG
). - Nella pagina Dettagli DAG, fai clic su Visualizzazione grafico.
Verifica lo stato:
Failed
: l'attività è racchiusa in una casella rossa. Puoi anche tenere premuto il cursore sopra l'attività e cercare Stato: non riuscito.Success
: l'attività è racchiusa in una casella verde. Puoi anche tenere premuto il cursore sopra l'attività e controllare se è visualizzato lo stato Stato: completata.
Dopo alcuni minuti, puoi controllare i risultati in Dataflow in BigQuery.
Visualizza il job in Dataflow
Nella console Google Cloud, vai alla pagina Dataflow.
Il tuo job è denominato
dataflow_operator_transform_csv_to_bq
con un ID univoco alla fine del nome con un trattino, in questo modo:Fai clic sul nome per visualizzare dettagli offerta di lavoro.
Visualizza i risultati in BigQuery
Nella console Google Cloud, vai alla pagina BigQuery.
Puoi inviare query utilizzando SQL standard. Utilizza la seguente query per visualizzare le righe aggiunte alla tabella:
SELECT * FROM projectId.average_weather.average_weather