Unione dei flussi di dati con Dataflow SQL


Questo tutorial mostra come utilizzare Dataflow SQL per unire un flusso di dati di Pub/Sub con i dati di una tabella BigQuery.

Obiettivi

In questo tutorial imparerai a:

  • Scrivi una query SQL Dataflow che unisce i flussi di dati Pub/Sub ai dati della tabella BigQuery.
  • Esegui il deployment di un job Dataflow dall'interfaccia utente SQL di Dataflow.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono essere idonei a una prova senza costi aggiuntivi.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Abilita le API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON di Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager e Data Catalog. .

    Abilita le API

  5. Crea un account di servizio:

    1. Nella console Google Cloud, vai alla pagina Crea account di servizio.

      Vai a Crea account di servizio
    2. Seleziona il progetto.
    3. Nel campo Nome account di servizio, inserisci un nome. La console Google Cloud compila il campo ID account di servizio in base a questo nome.

      Nel campo Descrizione account di servizio, inserisci una descrizione. Ad esempio, Service account for quickstart.

    4. Fai clic su Crea e continua.
    5. Concedi il ruolo Project > Owner all'account di servizio.

      Per concedere il ruolo, trova l'elenco Seleziona un ruolo e scegli Project > Owner.

    6. Fai clic su Continua.
    7. Fai clic su Fine per completare la creazione dell'account di servizio.

      Non chiudere la finestra del browser. La utilizzerai nel passaggio successivo.

  6. Crea una chiave dell'account di servizio:

    1. Nella console Google Cloud, fai clic sull'indirizzo email dell'account di servizio che hai creato.
    2. Fai clic su Chiavi.
    3. Fai clic su Aggiungi chiave, quindi su Crea nuova chiave.
    4. Fai clic su Crea. Un file della chiave JSON viene scaricato sul computer.
    5. Fai clic su Chiudi.
  7. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene le tue credenziali. Questa variabile si applica solo alla sessione di shell attuale. Pertanto, se apri una nuova sessione, imposta di nuovo la variabile.

  8. Nella pagina del selettore di progetti della console Google Cloud, seleziona o crea un progetto Google Cloud.

    Vai al selettore progetti

  9. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  10. Abilita le API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON di Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager e Data Catalog. .

    Abilita le API

  11. Crea un account di servizio:

    1. Nella console Google Cloud, vai alla pagina Crea account di servizio.

      Vai a Crea account di servizio
    2. Seleziona il progetto.
    3. Nel campo Nome account di servizio, inserisci un nome. La console Google Cloud compila il campo ID account di servizio in base a questo nome.

      Nel campo Descrizione account di servizio, inserisci una descrizione. Ad esempio, Service account for quickstart.

    4. Fai clic su Crea e continua.
    5. Concedi il ruolo Project > Owner all'account di servizio.

      Per concedere il ruolo, trova l'elenco Seleziona un ruolo e scegli Project > Owner.

    6. Fai clic su Continua.
    7. Fai clic su Fine per completare la creazione dell'account di servizio.

      Non chiudere la finestra del browser. La utilizzerai nel passaggio successivo.

  12. Crea una chiave dell'account di servizio:

    1. Nella console Google Cloud, fai clic sull'indirizzo email dell'account di servizio che hai creato.
    2. Fai clic su Chiavi.
    3. Fai clic su Aggiungi chiave, quindi su Crea nuova chiave.
    4. Fai clic su Crea. Un file della chiave JSON viene scaricato sul computer.
    5. Fai clic su Chiudi.
  13. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene le tue credenziali. Questa variabile si applica solo alla sessione di shell attuale. Pertanto, se apri una nuova sessione, imposta di nuovo la variabile.

  14. Installa e inizializza gcloud CLI. Scegli una delle opzioni di installazione. Potrebbe essere necessario impostare la proprietà project sul progetto che utilizzi per questa procedura dettagliata.
  15. Vai all'interfaccia utente web di Dataflow SQL nella console Google Cloud. Viene aperto il progetto a cui hai eseguito l'accesso più di recente. Per passare a un altro progetto, fai clic sul nome del progetto nella parte superiore dell'interfaccia utente web di Dataflow SQL e cerca il progetto che vuoi utilizzare.
    Vai all'interfaccia utente web di Dataflow SQL

Crea origini di esempio

Se vuoi seguire l'esempio fornito in questo tutorial, crea le seguenti origini e utilizzale nei passaggi del tutorial.

  • Un argomento Pub/Sub denominato transactions: un flusso di dati sulle transazioni che arriva tramite una sottoscrizione all'argomento Pub/Sub. I dati di ogni transazione includono informazioni quali il prodotto acquistato, il prezzo scontato e la città e la provincia in cui è avvenuto l'acquisto. Dopo aver creato l'argomento Pub/Sub, crei uno script che pubblica messaggi nell'argomento. Eseguirai questo script in una sezione successiva di questo tutorial.
  • Una tabella BigQuery denominata us_state_salesregions: una tabella che fornisce una mappatura degli stati alle regioni di vendita. Prima di creare questa tabella, devi creare un set di dati BigQuery.

Assegna uno schema all'argomento Pub/Sub

L'assegnazione di uno schema ti consente di eseguire query SQL sui dati dell'argomento Pub/Sub. Attualmente, Dataflow SQL prevede che i messaggi in argomenti Pub/Sub siano serializzati in formato JSON.

Per assegnare uno schema all'argomento Pub/Sub di esempio transactions:

  1. Crea un file di testo e assegnagli il nome transactions_schema.yaml. Copia e incolla il seguente testo dello schema in transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Assegna lo schema utilizzando Google Cloud CLI.

    a. Aggiorna gcloud CLI con il comando seguente. Assicurati che la versione gcloud CLI sia 242.0.0 o successiva.

      gcloud components update
    

    b. Esegui il seguente comando in una finestra della riga di comando. Sostituisci project-id con l'ID progetto e path-to-file con il percorso del file transactions_schema.yaml.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Per saperne di più sui parametri del comando e sui formati consentiti dei file di schema, consulta la pagina della documentazione per l'aggiornamento delle voci gcloud data-catalog.

    c. Verifica che lo schema sia stato assegnato correttamente all'argomento transactions Pub/Sub. Sostituisci project-id con l'ID progetto.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Trova origini Pub/Sub

La UI SQL di Dataflow consente di trovare gli oggetti delle origini dati Pub/Sub per qualsiasi progetto a cui hai accesso, in modo da non dover ricordare i loro nomi completi.

Per l'esempio di questo tutorial, vai all'editor SQL di Dataflow e cerca l'argomento transactions Pub/Sub che hai creato:

  1. Vai all'area di lavoro SQL.

  2. Nel riquadro Editor SQL di Dataflow, cerca projectid=project-id transactions nella barra di ricerca. Sostituisci project-id con l'ID progetto.

    Riquadro di ricerca di Data Catalog nell'area di lavoro SQL di Dataflow.

Visualizza lo schema

  1. Nel riquadro Editor SQL Dataflow dell'interfaccia utente di Dataflow SQL, fai clic su Transazioni o cerca un argomento Pub/Sub digitando projectid=project-id system=cloud_pubsub e seleziona l'argomento.
  2. In Schema, puoi visualizzare lo schema assegnato all'argomento Pub/Sub.

    Schema assegnato all'argomento con elenco dei nomi dei campi e relative descrizioni.

crea una query SQL

L'interfaccia utente SQL di Dataflow consente di creare query SQL per eseguire i job Dataflow.

La seguente query SQL è una query di arricchimento dei dati. Aggiunge un ulteriore campo, sales_region, al flusso di eventi Pub/Sub (transactions), utilizzando una tabella BigQuery (us_state_salesregions) che mappa gli stati alle regioni di vendita.

Copia e incolla la seguente query SQL nell'Editor query. Sostituisci project-id con l'ID progetto.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Quando inserisci una query nell'interfaccia utente SQL di Dataflow, lo validator delle query verifica la sintassi delle query. Se la query è valida, viene visualizzato un segno di spunta verde. Se la query non è valida, viene visualizzata un'icona a forma di punto esclamativo rosso. Se la sintassi della query non è valida, fai clic sull'icona dello validator per ottenere informazioni sulle operazioni da correggere.

Il seguente screenshot mostra la query valida nell'Editor query. Lo strumento di convalida mostra un segno di spunta verde.

Area di lavoro SQL di Dataflow con la query del tutorial visibile nell'editor.

Crea un job Dataflow per eseguire la query SQL

Per eseguire la query SQL, crea un job Dataflow dall'interfaccia utente SQL di Dataflow.

  1. Nell'Editor query, fai clic su Crea job.

  2. Nel riquadro Crea job Dataflow che si apre:

    • In corrispondenza di Destinazione, seleziona BigQuery.
    • In ID set di dati, seleziona dataflow_sql_tutorial.
    • In Nome tabella, inserisci sales.
    Crea il modulo del job SQL di Dataflow.
  3. (Facoltativo) Dataflow sceglie automaticamente le impostazioni ottimali per il job SQL di Dataflow, ma puoi espandere il menu Parametri facoltativi per specificare manualmente le seguenti opzioni della pipeline:

    • Numero massimo di worker
    • Zona
    • Email dell'account di servizio
    • Tipo di macchina
    • Esperimenti aggiuntivi
    • Configurazione degli indirizzi IP del worker
    • Rete
    • Subnet
  4. Fai clic su Crea. L'esecuzione del job Dataflow richiede alcuni minuti.

Visualizza il job di Dataflow

Dataflow trasforma la tua query SQL in una pipeline Apache Beam. Fai clic su Visualizza job per aprire l'interfaccia utente web di Dataflow, dove puoi vedere una rappresentazione grafica della pipeline.

Pipeline della query SQL mostrata nella UI web di Dataflow.

Per un'analisi delle trasformazioni avvenute nella pipeline, fai clic sulle caselle. Ad esempio, se fai clic sulla prima casella nella rappresentazione grafica, denominata Esegui query SQL, viene visualizzato un grafico che mostra le operazioni che avvengono dietro le quinte.

Le prime due caselle rappresentano i due input che hai unito: l'argomento Pub/Sub, transactions, e la tabella BigQuery, us_state_salesregions.

La scrittura dell'output di un join di due input viene completata in 25 secondi.

Per visualizzare la tabella di output contenente i risultati del job, vai alla UI di BigQuery. Nel riquadro Explorer, nel progetto, fai clic sul set di dati dataflow_sql_tutorial che hai creato. Poi, fai clic sulla tabella di output sales. La scheda Anteprima mostra i contenuti della tabella di output.

La tabella di anteprima delle vendite contiene le colonne tr_time_str, first_name, last_name, city, state, product, amount e sales_region.

Visualizza job passati e modifica le query

L'interfaccia utente di Dataflow archivia i job e le query precedenti nella pagina Job di Dataflow.

Puoi utilizzare l'elenco della cronologia dei job per visualizzare le query SQL precedenti. Ad esempio, vuoi modificare la query per aggregare le vendite per regione di vendita ogni 15 secondi. Utilizza la pagina Job per accedere al job in esecuzione avviato in precedenza nel tutorial, copiare la query SQL ed eseguire un altro job con una query modificata.

  1. Nella pagina Job di Dataflow, fai clic sul job da modificare.

  2. Nella pagina Dettagli job, nel riquadro Informazioni job, in Opzioni della pipeline, individua la query SQL. Trova la riga per queryString.

    L'opzione della pipeline del job denominata queryString.
  3. Copia e incolla la seguente query SQL nell'editor SQL di Dataflow nell'area di lavoro SQL per aggiungere finestre cadenti. Sostituisci project-id con l'ID progetto.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Fai clic su Crea job per creare un nuovo job con la query modificata.

Esegui la pulizia

Per evitare che al tuo account di fatturazione Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial:

  1. Interrompi lo script di pubblicazione transactions_injector.py se è ancora in esecuzione.

  2. Arresta i job Dataflow in esecuzione. Vai all'interfaccia utente web di Dataflow nella console Google Cloud.

    Vai all'interfaccia utente web di Dataflow

    Per ogni job creato dopo questa procedura dettagliata, esegui questi passaggi:

    1. Fai clic sul nome del job.

    2. Nella pagina Dettagli job, fai clic su Arresta. Viene visualizzata la finestra di dialogo Arresta job con le opzioni su come arrestare il job.

    3. Seleziona Annulla.

    4. Fai clic su Interrompi job. Il servizio interrompe l'importazione e l'elaborazione dei dati il prima possibile. Poiché Annulla interrompe immediatamente l'elaborazione, potresti perdere i dati "in corso". L'arresto di un job potrebbe richiedere alcuni minuti.

  3. Elimina il set di dati BigQuery. Vai all'interfaccia utente web di BigQuery nella console Google Cloud.

    Vai all'interfaccia utente web di BigQuery

    1. Nel riquadro Explorer, nella sezione Risorse, fai clic sul set di dati dataflow_sql_tutorial che hai creato.

    2. Nel riquadro dei dettagli, fai clic su Elimina. Si apre una finestra di dialogo di conferma.

    3. Nella finestra di dialogo Elimina set di dati, conferma il comando di eliminazione digitando delete, quindi fai clic su Elimina.

  4. Eliminare l'argomento Pub/Sub. Vai alla pagina degli argomenti Pub/Sub nella console Google Cloud.

    Vai alla pagina degli argomenti Pub/Sub

    1. Seleziona l'argomento transactions.

    2. Fai clic su Elimina per eliminare definitivamente l'argomento. Si apre una finestra di dialogo di conferma.

    3. Nella finestra di dialogo Elimina argomento, conferma il comando di eliminazione digitando delete, quindi fai clic su Elimina.

    4. Vai alla pagina delle sottoscrizioni Pub/Sub.

    5. Seleziona eventuali abbonamenti rimanenti a transactions. Se i tuoi job non sono più in esecuzione, potrebbero non esserci abbonamenti.

    6. Fai clic su Elimina per eliminare definitivamente gli abbonamenti. Nella finestra di dialogo di conferma, fai clic su Elimina.

  5. Elimina il bucket gestione temporanea di Dataflow in Cloud Storage. Vai alla pagina Bucket di Cloud Storage nella console Google Cloud.

    Vai a Bucket

    1. Seleziona il bucket gestione temporanea Dataflow.

    2. Fai clic su Elimina per eliminare definitivamente il bucket. Si apre una finestra di dialogo di conferma.

    3. Nella finestra di dialogo Elimina bucket, conferma il comando di eliminazione digitando DELETE, quindi fai clic su Elimina.

Passaggi successivi