Unione dei flussi di dati con Dataflow SQL


Questo tutorial mostra come utilizzare Dataflow SQL per unire un flusso di dati da in Pub/Sub con i dati di una tabella BigQuery.

Obiettivi

In questo tutorial imparerai a:

  • Scrivi una query SQL Dataflow che unisce i flussi di dati Pub/Sub con dati della tabella BigQuery.
  • Esegui il deployment di un job Dataflow dall'interfaccia utente di Dataflow SQL.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Abilita le API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. .

    Abilita le API

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene le tue credenziali. Questa variabile si applica solo alla sessione di shell attuale. Pertanto, se apri una nuova sessione, imposta di nuovo la variabile.

  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  10. Abilita le API Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. .

    Abilita le API

  11. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  12. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  13. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS sul percorso del file JSON che contiene le tue credenziali. Questa variabile si applica solo alla sessione di shell attuale. Pertanto, se apri una nuova sessione, imposta di nuovo la variabile.

  14. Installare e inizializzare gcloud CLI. Scegli una delle opzioni di installazione. Potresti dover impostare la proprietà project al progetto che usi per questa procedura dettagliata.
  15. Vai all'interfaccia utente web di Dataflow SQL nella console Google Cloud. Questo apre il progetto a cui hai eseguito l'accesso più di recente. Per passare a un'altra progetto, fai clic sul nome del progetto nella parte superiore della nell'interfaccia utente web di Dataflow SQL e cerca il progetto che vuoi utilizzare.
    Vai al file SQL di Dataflow UI web

Crea origini di esempio

Se vuoi seguire l'esempio fornito in questo tutorial, crea il le seguenti fonti e usale nei passaggi del tutorial.

  • Un argomento Pub/Sub chiamato transactions: uno stream di Dati sulle transazioni che arrivano tramite una sottoscrizione a Pub/Sub. per ogni argomento. I dati di ogni transazione includono informazioni come il prodotto acquistato, il prezzo scontato, nonché la città e lo stato in cui si è verificato un errore. Dopo aver creato l'argomento Pub/Sub, puoi creare uno script che pubblica messaggi nel tuo argomento. Eseguirai questo script in un secondo di questo tutorial.
  • Una tabella BigQuery chiamata us_state_salesregions - A che fornisce una mappatura degli stati alle regioni di vendita. Prima di crearlo devi creare un set di dati BigQuery.

Assegna uno schema all'argomento Pub/Sub

L'assegnazione di uno schema consente di eseguire query SQL sull'argomento Pub/Sub e i dati di Google Cloud. Attualmente, Dataflow SQL prevede che i messaggi vengano Argomenti Pub/Sub da serializzare in formato JSON.

Per assegnare uno schema argomento Pub/Sub di esempio transactions:

  1. Crea un file di testo e assegnagli il nome transactions_schema.yaml. Copia e incolla il seguente testo dello schema in transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Assegna lo schema utilizzando Google Cloud CLI.

    a. Aggiorna gcloud CLI con seguente comando. Assicurati che la versione della gcloud CLI è 242.0.0 o successiva.

      gcloud components update
    

    b. Esegui l' comando seguente in un della riga di comando. Sostituisci project-id con il tuo ID progetto e path-to-file con il percorso di transactions_schema.yaml .

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Per ulteriori informazioni sui parametri del comando e sul file di schema consentito formati, consulta la pagina della documentazione per gcloud data-catalog voci update.

    c. Verifica che lo schema sia stato assegnato correttamente a transactions Pub/Sub. Sostituisci project-id con il tuo progetto ID.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Trova origini Pub/Sub

L'interfaccia utente di Dataflow SQL consente di trovare gli oggetti dell'origine dati Pub/Sub per qualsiasi progetto a cui hai accesso, in modo da non doverne ricordare il nome completo.

Per l'esempio di questo tutorial, vai all'editor SQL di Dataflow e cerca Pub/Sub transactions creato:

  1. Vai all'area di lavoro SQL.

  2. Nel riquadro Editor SQL Dataflow, cerca projectid=project-id transactions nella barra di ricerca. Sostituisci project-id con il tuo ID progetto.

    Riquadro di ricerca di Data Catalog nell'area di lavoro SQL di Dataflow.

Visualizza lo schema

  1. Nel riquadro Editor SQL Dataflow dell'interfaccia utente di Dataflow SQL, fai clic su transactions o cerca un argomento Pub/Sub digitando projectid=project-id system=cloud_pubsub e seleziona l'argomento.
  2. In Schema puoi visualizzare lo schema assegnato al Pub/Sub.

    Schema assegnato all'argomento, che include l'elenco dei nomi dei campi e le relative descrizioni.

Crea una query SQL

L'interfaccia utente SQL di Dataflow consente di creare query SQL per eseguire di job Dataflow.

La seguente query SQL è una query di arricchimento dei dati. Aggiunge un altro campo, sales_region, al flusso di eventi Pub/Sub (transactions), utilizzando una tabella BigQuery (us_state_salesregions) che mappa gli stati alle regioni di vendita.

Copia e incolla questa query SQL in Editor query. Sostituisci project-id con il tuo ID progetto.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Quando inserisci una query nell'interfaccia utente SQL di Dataflow, strumento di convalida verifica la sintassi della query. R Se la query è valida, viene visualizzata un'icona con un segno di spunta verde. Se la query è non valida, viene visualizzata un'icona rossa con punto esclamativo. Se la sintassi della query è non valido, facendo clic sull'icona dello strumento di convalida vengono fornite informazioni su cosa che devi correggere.

Il seguente screenshot mostra la query valida nell'Editor di query. La di convalida mostra un segno di spunta verde.

Area di lavoro SQL di Dataflow con la query del tutorial visibile nell'editor.

Crea un job Dataflow per eseguire la query SQL

Per eseguire la query SQL, crea un job Dataflow dal UI di Dataflow SQL.

  1. In Editor di query, fai clic su Crea job.

  2. Nel riquadro Crea job Dataflow che si apre:

    • Per Destinazione, seleziona BigQuery.
    • In ID set di dati, seleziona dataflow_sql_tutorial.
    • In Nome tabella, inserisci sales.
    Crea il modulo del job SQL di Dataflow.
  3. (Facoltativo) Dataflow sceglie automaticamente le impostazioni che sono ottimale per il tuo job SQL di Dataflow, ma puoi espandere Menu Parametri facoltativi per specificare manualmente quanto segue. opzioni pipeline:

    • Numero massimo di worker
    • Zona
    • Email dell'account di servizio
    • Tipo di macchina
    • Esperimenti aggiuntivi
    • Configurazione dell'indirizzo IP del worker
    • Rete
    • Subnet
  4. Fai clic su Crea. Il job Dataflow richiede alcuni minuti per iniziare l'esecuzione.

Visualizza il job di Dataflow

Dataflow trasforma la tua query SQL in una pipeline Apache Beam. Fai clic su Visualizza job per aprirlo la UI web di Dataflow, dove puoi vedere una rappresentazione grafica della pipeline.

Pipeline da query SQL mostrata nella UI web di Dataflow.

Per visualizzare un'analisi dettagliata delle trasformazioni che si verificano della pipeline, fai clic sulle caselle. Ad esempio, se fai clic sulla prima casella nella Esegui query SQL, viene visualizzata un'immagine che mostra le operazioni che si svolgono in background.

Le prime due caselle rappresentano i due input che hai unito: argomento Pub/Sub, transactions e BigQuery tabella us_state_salesregions.

L'output di scrittura di un join di due input viene completato in 25 secondi.

Per visualizzare la tabella di output che contiene i risultati del job, vai alla UI di BigQuery. Nel riquadro Explorer, nel tuo progetto, fai clic su il set di dati dataflow_sql_tutorial che hai creato. Quindi, fai clic sulla tabella di output sales. La scheda Anteprima mostra i contenuti della tabella di output.

La tabella di anteprima delle vendite contiene le colonne per tr_time_str, first_name, last_name, city, state, product, amount e sale_region.

Visualizza le offerte di lavoro precedenti e modifica le query

L'interfaccia utente di Dataflow archivia le query e i job precedenti nella pagina Job di Dataflow.

Puoi utilizzare l'elenco della cronologia dei job per visualizzare le query SQL precedenti. Ad esempio, vuoi modificare la query per aggregare le vendite per regione ogni 15 secondi. Utilizza la pagina Job per accedere al job in esecuzione avviato in precedenza nella copia la query SQL ed esegui un altro job con una query modificata.

  1. Nella pagina Job di Dataflow, fai clic sul job che ti interessa modifica.

  2. Nella pagina Dettagli job, nel riquadro Informazioni job, in Pipeline opzioni, individua la query SQL. Trova la riga relativa a queryString.

    L'opzione della pipeline del job denominata queryString.
  3. Copia e incolla questa query SQL nell'editor SQL Dataflow della Area di lavoro SQL da aggiungere finestre scorrevoli. Sostituisci project-id con il tuo ID progetto.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Fai clic su Crea job per creare un nuovo job con la query modificata.

Esegui la pulizia

Per evitare che al tuo account di fatturazione Cloud vengano addebitati costi relativi alle risorse utilizzate. in questo tutorial:

  1. Interrompi lo script di pubblicazione di transactions_injector.py, se è ancora in esecuzione.

  2. Arresta i job Dataflow in esecuzione. Vai alla sezione UI web di Dataflow nella console Google Cloud.

    Vai all'interfaccia utente web di Dataflow

    Per ogni job creato dopo aver seguito questa procedura dettagliata, segui questi passaggi passaggi:

    1. Fai clic sul nome del job.

    2. Nella pagina Dettagli job, fai clic su Arresta. La Viene visualizzata la finestra di dialogo Arresta job con le opzioni per l'interruzione del job.

    3. Seleziona Annulla.

    4. Fai clic su Arresta job. Il servizio interrompe l'importazione e l'elaborazione dei dati il prima possibile. Poiché l'opzione Annulla interrompe immediatamente l'elaborazione, potrebbero perdere i dati "in-flight" e i dati di Google Cloud. L'arresto di un job potrebbe richiedere alcuni minuti.

  3. Elimina il set di dati BigQuery. Vai alla sezione UI web di BigQuery nella console Google Cloud.

    Vai alla UI web di BigQuery

    1. Nel riquadro Explorer, nella sezione Risorse, fai clic sul Set di dati dataflow_sql_tutorial che hai creato.

    2. Nel riquadro dei dettagli, fai clic su Elimina. Si apre una finestra di dialogo di conferma.

    3. Nella finestra di dialogo Elimina set di dati, digita per confermare il comando di eliminazione delete, quindi fai clic su Elimina.

  4. Elimina l'argomento Pub/Sub. Vai a Pub/Sub nella console Google Cloud.

    Vai alla pagina degli argomenti Pub/Sub

    1. Seleziona l'argomento transactions.

    2. Fai clic su Elimina per eliminare definitivamente l'argomento. Si apre una finestra di dialogo di conferma.

    3. Nella finestra di dialogo Elimina argomento, digita per confermare il comando di eliminazione delete, quindi fai clic su Elimina.

    4. Vai alla pagina delle sottoscrizioni Pub/Sub.

    5. Seleziona eventuali abbonamenti rimanenti per transactions. Se i tuoi job non sono più in esecuzione, potrebbero non esserci abbonamenti.

    6. Fai clic su Elimina per eliminare definitivamente le iscrizioni. Nella finestra di dialogo di conferma, fai clic su Elimina.

  5. Elimina il bucket gestione temporanea Dataflow in Cloud Storage. Vai alla pagina Bucket di Cloud Storage nella console Google Cloud.

    Vai a Bucket

    1. Seleziona il bucket gestione temporanea Dataflow.

    2. Fai clic su Elimina per eliminare definitivamente il bucket. Si apre una finestra di dialogo di conferma.

    3. Nella finestra di dialogo Elimina bucket, conferma il comando di eliminazione digitando DELETE, quindi fai clic su Elimina.

Passaggi successivi