Questo tutorial mostra come utilizzare Dataflow SQL per unire un flusso di dati da Pub/Sub ai dati di una tabella BigQuery.
Obiettivi
In questo tutorial imparerai a:
- Scrivi una query SQL Dataflow che unisce i flussi di dati Pub/Sub ai dati della tabella BigQuery.
- Esegui il deployment di un job Dataflow dall'interfaccia utente di Dataflow SQL.
Costi
In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:
- Dataflow
- Cloud Storage
- Pub/Sub
- Data Catalog
Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi.
Prima di iniziare
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. .
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Imposta la variabile di ambiente
GOOGLE_APPLICATION_CREDENTIALS
sul percorso del file JSON che contiene le tue credenziali. Questa variabile si applica solo alla sessione di shell attuale. Pertanto, se apri una nuova sessione, imposta di nuovo la variabile. -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.
-
Abilita le API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. .
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Imposta la variabile di ambiente
GOOGLE_APPLICATION_CREDENTIALS
sul percorso del file JSON che contiene le tue credenziali. Questa variabile si applica solo alla sessione di shell attuale. Pertanto, se apri una nuova sessione, imposta di nuovo la variabile. - Installare e inizializzare gcloud CLI. Scegli una delle
opzioni di installazione.
Potresti dover impostare la proprietà
project
sul progetto che stai utilizzando per questa procedura dettagliata. - Vai all'interfaccia utente web di Dataflow SQL nella console Google Cloud. Viene aperto il progetto a cui hai eseguito l'accesso più di recente. Per passare a un altro progetto, fai clic sul nome del progetto in alto nell'interfaccia utente web di Dataflow SQL e cerca il progetto che vuoi utilizzare.
Vai all'interfaccia utente web di Dataflow SQL
Crea origini di esempio
Se vuoi seguire l'esempio fornito in questo tutorial, crea le seguenti origini e utilizzale nei passaggi del tutorial.
- Un argomento Pub/Sub denominato
transactions
: un flusso di dati sulle transazioni che arrivano tramite una sottoscrizione all'argomento Pub/Sub. I dati di ogni transazione includono informazioni come il prodotto acquistato, il prezzo scontato, nonché la città e lo stato in cui è avvenuto l'acquisto. Dopo aver creato l'argomento Pub/Sub, puoi creare uno script che pubblica messaggi nel tuo argomento. Eseguirai questo script in una sezione successiva di questo tutorial. - Tabella BigQuery denominata
us_state_salesregions
: una tabella che fornisce una mappatura degli stati alle regioni di vendita. Prima di creare questa tabella, devi creare un set di dati BigQuery.
Assegna uno schema all'argomento Pub/Sub
L'assegnazione di uno schema consente di eseguire query SQL sui dati dell'argomento Pub/Sub. Attualmente, Dataflow SQL prevede che i messaggi in argomenti Pub/Sub vengano serializzati in formato JSON.
Per assegnare uno schema all'argomento Pub/Sub di esempio transactions
:
Crea un file di testo e assegnagli il nome
transactions_schema.yaml
. Copia e incolla il seguente testo dello schema intransactions_schema.yaml
.- column: event_timestamp description: Pub/Sub event timestamp mode: REQUIRED type: TIMESTAMP - column: tr_time_str description: Transaction time string mode: NULLABLE type: STRING - column: first_name description: First name mode: NULLABLE type: STRING - column: last_name description: Last name mode: NULLABLE type: STRING - column: city description: City mode: NULLABLE type: STRING - column: state description: State mode: NULLABLE type: STRING - column: product description: Product mode: NULLABLE type: STRING - column: amount description: Amount of transaction mode: NULLABLE type: FLOAT
Assegna lo schema utilizzando Google Cloud CLI.
a. Aggiorna gcloud CLI con il seguente comando. Assicurati che la versione gcloud CLId sia 242.0.0 o successiva.
gcloud components update
b. Esegui questo comando seguente in una finestra della riga di comando. Sostituisci project-id con l'ID progetto e path-to-file con il percorso del file
transactions_schema.yaml
.gcloud data-catalog entries update \ --lookup-entry='pubsub.topic.`project-id`.transactions' \ --schema-from-file=path-to-file/transactions_schema.yaml
Per ulteriori informazioni sui parametri del comando e sui formati di file di schema consentiti, consulta la pagina della documentazione relativa al aggiornamento delle voci di gcloud data-catalog.
c. Verifica che lo schema sia stato assegnato all'argomento Pub/Sub
transactions
. Sostituisci project-id con il tuo ID progetto.gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
Trova origini Pub/Sub
La UI di Dataflow SQL offre un modo per trovare gli oggetti dell'origine dati Pub/Sub per qualsiasi progetto a cui hai accesso, in modo da non dover ricordare il nome completo.
Per l'esempio in questo tutorial, accedi all'editor SQL di Dataflow e cerca l'argomento Pub/Sub transactions
che hai creato:
Vai all'area di lavoro SQL.
Nel riquadro Editor SQL Dataflow, cerca
projectid=project-id transactions
nella barra di ricerca. Sostituisci project-id con il tuo ID progetto.
Visualizza lo schema
- Nel riquadro Editor SQL Dataflow dell'interfaccia utente SQL di Dataflow, fai clic su transactions o cerca un argomento Pub/Sub digitando
projectid=project-id system=cloud_pubsub
e seleziona l'argomento. In Schema, puoi visualizzare lo schema assegnato all'argomento Pub/Sub.
Crea una query SQL
L'interfaccia utente SQL di Dataflow consente di creare query SQL per eseguire i job Dataflow.
La seguente query SQL è una query di arricchimento dei dati. Aggiunge un altro campo, sales_region
, al flusso di eventi Pub/Sub (transactions
), utilizzando una tabella BigQuery (us_state_salesregions
) che mappa gli stati alle regioni di vendita.
Copia e incolla questa query SQL in Editor query. Sostituisci project-id con il tuo ID progetto.
SELECT tr.*, sr.sales_region FROM pubsub.topic.`project-id`.transactions as tr INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr ON tr.state = sr.state_code
Quando inserisci una query nell'interfaccia utente di Dataflow SQL, lo validator delle query verifica la sintassi della query. Se la query è valida, viene visualizzata un'icona con un segno di spunta verde. Se la query non è valida, viene visualizzata un'icona rossa con punto esclamativo. Se la sintassi della query non è valida, facendo clic sull'icona dello validator vengono fornite informazioni sugli aspetti da correggere.
Il seguente screenshot mostra la query valida nell'Editor di query. Lo strumento di convalida mostra un segno di spunta verde.
Crea un job Dataflow per eseguire la query SQL
Per eseguire la query SQL, crea un job Dataflow dall'interfaccia utente SQL di Dataflow.
In Editor di query, fai clic su Crea job.
Nel riquadro Crea job Dataflow che si apre:
- Per Destinazione, seleziona BigQuery.
- In ID set di dati, seleziona
dataflow_sql_tutorial
. - In Nome tabella, inserisci
sales
.
(Facoltativo) Dataflow sceglie automaticamente le impostazioni ottimali per il job SQL Dataflow, ma puoi espandere il menu Parametri facoltativi per specificare manualmente le seguenti opzioni della pipeline:
- Numero massimo di worker
- Zona
- Email dell'account di servizio
- Tipo di macchina
- Esperimenti aggiuntivi
- Configurazione dell'indirizzo IP del worker
- Rete
- Subnet
Fai clic su Crea. L'esecuzione del job Dataflow richiede qualche minuto.
Visualizza il job di Dataflow
Dataflow trasforma la tua query SQL in una pipeline Apache Beam. Fai clic su Visualizza job per aprire la UI web di Dataflow, dove puoi vedere una rappresentazione grafica della pipeline.
Per vedere un'analisi dettagliata delle trasformazioni che avvengono nella pipeline, fai clic sulle caselle. Ad esempio, se fai clic sulla prima casella nella rappresentazione grafica, etichettata come Esegui query SQL, viene visualizzata un'immagine che mostra le operazioni in corso.
Le prime due caselle rappresentano i due input che hai unito: l'argomento Pub/Sub transactions
e la tabella BigQuery, us_state_salesregions
.
Per visualizzare la tabella di output che contiene i risultati del job, vai alla UI di BigQuery.
Nel riquadro di esplorazione del progetto fai clic sul set di dati dataflow_sql_tutorial
che hai creato. Quindi, fai clic sulla tabella di output,
sales
. La scheda Anteprima mostra i contenuti della tabella di output.
Visualizza le offerte di lavoro precedenti e modifica le query
L'interfaccia utente di Dataflow archivia le query e i job precedenti nella pagina Job di Dataflow.
Puoi utilizzare l'elenco della cronologia dei job per visualizzare le query SQL precedenti. Ad esempio, vuoi modificare la query per aggregare le vendite per regione di vendita ogni 15 secondi. Utilizza la pagina Job per accedere al job in esecuzione avviato in precedenza nel tutorial, copia la query SQL ed esegui un altro job con una query modificata.
Nella pagina Job di Dataflow, fai clic sul job che vuoi modificare.
Nella pagina Dettagli job, nel riquadro Informazioni job, in Opzioni pipeline, individua la query SQL. Trova la riga relativa a queryString.
Copia e incolla la seguente query SQL nell'Editor SQL Dataflow nell'area di lavoro SQL per aggiungere finestre a scorrimento. Sostituisci project-id con il tuo ID progetto.
SELECT sr.sales_region, TUMBLE_START("INTERVAL 15 SECOND") AS period_start, SUM(tr.amount) as amount FROM pubsub.topic.`project-id`.transactions AS tr INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr ON tr.state = sr.state_code GROUP BY sr.sales_region, TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
Fai clic su Crea job per creare un nuovo job con la query modificata.
Esegui la pulizia
Per evitare che al tuo account di fatturazione Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial:
Interrompi lo script di pubblicazione di
transactions_injector.py
se è ancora in esecuzione.Arresta i job Dataflow in esecuzione. Vai alla UI web di Dataflow nella console Google Cloud.
Vai all'interfaccia utente web di Dataflow
Per ogni job creato dopo aver seguito questa procedura dettagliata, segui questi passaggi:
Fai clic sul nome del job.
Nella pagina Dettagli job, fai clic su Arresta. Viene visualizzata la finestra di dialogo Arresta job con le opzioni per l'interruzione del job.
Seleziona Annulla.
Fai clic su Arresta job. Il servizio interrompe appena possibile l'importazione e l'elaborazione dei dati. Poiché l'opzione Annulla interrompe immediatamente l'elaborazione, potresti perdere tutti i dati "in corso di elaborazione". L'arresto di un job potrebbe richiedere alcuni minuti.
Elimina il set di dati BigQuery. Vai alla UI web di BigQuery nella console Google Cloud.
Nel riquadro Explorer, nella sezione Risorse, fai clic sul set di dati dataflow_sql_tutorial che hai creato.
Nel riquadro dei dettagli, fai clic su Elimina. Si apre una finestra di dialogo di conferma.
Nella finestra di dialogo Elimina set di dati, digita
delete
per confermare l'eliminazione, quindi fai clic su Elimina.
Elimina l'argomento Pub/Sub. Vai alla pagina degli argomenti Pub/Sub nella console Google Cloud.
Vai alla pagina degli argomenti Pub/Sub
Seleziona l'argomento
transactions
.Fai clic su Elimina per eliminare definitivamente l'argomento. Si apre una finestra di dialogo di conferma.
Nella finestra di dialogo Elimina argomento, digita
delete
per confermare l'eliminazione, quindi fai clic su Elimina.Vai alla pagina delle sottoscrizioni Pub/Sub.
Seleziona eventuali abbonamenti rimanenti per
transactions
. Se i tuoi job non sono più in esecuzione, potrebbero non essere presenti abbonamenti.Fai clic su Elimina per eliminare definitivamente le iscrizioni. Nella finestra di dialogo di conferma, fai clic su Elimina.
Elimina il bucket gestione temporanea Dataflow in Cloud Storage. Vai alla pagina Bucket di Cloud Storage nella console Google Cloud.
Seleziona il bucket gestione temporanea Dataflow.
Fai clic su Elimina per eliminare definitivamente il bucket. Si apre una finestra di dialogo di conferma.
Nella finestra di dialogo Elimina bucket, digita
DELETE
per confermare il comando di eliminazione, quindi fai clic su Elimina.
Passaggi successivi
- Consulta un'introduzione a Dataflow SQL.
- Scopri di più sulle nozioni di base sulle pipeline di flusso.
- Esplora il riferimento SQL di Dataflow.
- Guarda la demo dell'analisi dei flussi di dati offerta a Cloud Next 2019.