Creare una pipeline di inserimento flussi utilizzando un modello Dataflow

Questa guida rapida mostra come creare una pipeline di inserimento flussi utilizzando un Modello Dataflow fornito da Google. Nello specifico, questa guida rapida utilizza il modello Da Pub/Sub a BigQuery come esempio.

Il modello da Pub/Sub a BigQuery è un pipeline di flusso in grado di leggere messaggi in formato JSON da una Pub/Sub e scrivili in una tabella BigQuery.


Per seguire le indicazioni dettagliate per questa attività direttamente nella console Google Cloud, fai clic su Procedura guidata:

Procedura guidata


Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  8. Crea un bucket Cloud Storage:
    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.
  9. Copia quanto segue, poiché ti serviranno in una sezione successiva:
  10. Per completare i passaggi della guida rapida, il tuo account utente deve disporre Ruolo Amministratore Dataflow e il ruolo Utente account di servizio. L'account di servizio predefinito di Compute Engine deve avere il ruolo Worker Dataflow. Per aggiungere i ruoli richiesti nella console Google Cloud:

    1. Vai alla pagina IAM.
      Vai a IAM
    2. Seleziona il progetto.
    3. Nella riga contenente il tuo account utente, fai clic su Modifica entità e fai clic su Aggiungi un altro ruolo.
    4. Fai clic su Aggiungi un altro ruolo e nell'elenco a discesa seleziona Amministratore Dataflow.
    5. Fai clic su Aggiungi un altro ruolo e poi seleziona Service Account User (Utente account di servizio) dall'elenco a discesa.
    6. Fai clic su Salva.
    7. Nella riga contenente l'account di servizio predefinito di Compute Engine, fai clic su Modifica entità.
    8. Fai clic su Aggiungi un altro ruolo e nell'elenco a discesa seleziona Worker Dataflow.
    9. Fai clic su Aggiungi un altro ruolo e poi dall'elenco a discesa, seleziona Editor Pub/Sub.
    10. Fai clic su Aggiungi un altro ruolo e poi Nell'elenco a discesa, seleziona Editor dati BigQuery.
    11. Fai clic su Salva.

      Per ulteriori informazioni sulla concessione dei ruoli, consulta Concedere un ruolo IAM utilizzando la console.

  11. Per impostazione predefinita, ogni nuovo progetto viene avviato con una rete predefinita. Se la rete predefinita per il progetto è disattivata o è stata eliminata, devi avere una rete nel progetto per la quale il tuo account utente dispone del ruolo Utente di rete Compute (roles/compute.networkUser).

Creare un set di dati e una tabella BigQuery

Crea un set di dati e una tabella BigQuery con lo schema appropriato per il tuo argomento Pub/Sub utilizzando la console Google Cloud.

In questo esempio, il nome del set di dati è taxirides e il nome del è realtime. Per creare il set di dati e la tabella:

  1. Vai alla pagina BigQuery.
    Vai a BigQuery
  2. Nel riquadro Explorer, accanto al progetto in cui vuoi creare il del set di dati, fai clic su Visualizza le azioni, quindi fai clic su Crea set di dati.
  3. Nel riquadro Crea set di dati, segui questi passaggi:
    1. In ID set di dati, inserisci taxirides. Gli ID set di dati sono univoci per ogni progetto Google Cloud.
    2. Per Tipo di località, scegli Più regioni e poi Stati Uniti (più regioni negli Stati Uniti). I set di dati pubblici sono archiviati nella località US con più regioni. Per semplicità, inserisci il tuo set di dati nella stessa posizione.
    3. Lascia invariate le altre impostazioni predefinite e fai clic su Crea set di dati.
  4. Nel riquadro Explorer, espandi il progetto.
  5. Accanto al set di dati taxirides, fai clic su Visualizza le azioni, poi fai clic su Crea tabella.
  6. Nel riquadro Crea tabella, segui questi passaggi:
    1. Nella sezione Origine, per Crea tabella da, seleziona Tabella vuota.
    2. Nella sezione Destinazione, in Tabella, inserisci realtime.
    3. Nella sezione Schema, fai clic sull'opzione di attivazione/disattivazione Modifica come testo e incolla la seguente definizione di schema nel riquadro:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. Nella sezione Impostazioni di partizionamento e clustering, per Partizionamento, seleziona il campo timestamp.
  7. Lascia invariate le altre impostazioni predefinite e fai clic su Crea tabella.

esegui la pipeline.

Esegui una pipeline in modalità flusso utilizzando il modello Da Pub/Sub a BigQuery fornito da Google. La pipeline riceve i dati in arrivo dall'argomento di input.

  1. Vai alla pagina Job Dataflow:
    Vai a Job
  2. Fai clic su Crea job da modello.
  3. Inserisci taxi-data come Nome job per il tuo job Dataflow.
  4. In Modello Dataflow, seleziona il modello Da Pub/Sub a BigQuery.
  5. In Tabella di output BigQuery, inserisci quanto segue:
    PROJECT_ID:taxirides.realtime

    Sostituisci PROJECT_ID con l'ID del progetto in cui hai creato il set di dati BigQuery.

  6. Espandi Parametri facoltativi.
  7. In Argomento di input Pub/Sub, fai clic su Inserisci argomento manualmente.
  8. Nella finestra di dialogo, in Nome argomento, inserisci quanto segue e poi fai clic su Salva:
    projects/pubsub-public-data/topics/taxirides-realtime

    Questo argomento Pub/Sub disponibile pubblicamente si basa su New York Taxi e set di dati aperto della Limousine Commission. Di seguito è riportato un messaggio di esempio di questo argomento, in il formato JSON:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  9. Per Località temporanea, inserisci quanto segue:
    gs://BUCKET_NAME/temp/

    Sostituisci BUCKET_NAME con il nome del tuo account Cloud Storage di sincronizzare la directory di una VM con un bucket. La cartella temp archivia i file temporanei, ad esempio un job di pipeline in fasi.

  10. Se il progetto non dispone di una rete predefinita, inserisci una Rete e una Subnet. Per saperne di più, consulta la sezione Specificare una rete e una sottorete.
  11. Fai clic su Esegui job.

Visualizza i tuoi risultati

Per visualizzare i dati scritti nella tabella realtime:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Fai clic su Crea una nuova query. Si apre una nuova scheda Editor.

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Sostituisci PROJECT_ID con l'ID del progetto in cui creato il tuo set di dati BigQuery. La visualizzazione dei dati nella tabella può richiedere fino a un minuto.

  3. Fai clic su Esegui.

    La query restituisce le righe che sono state aggiunte alla tabella nelle ultime 24 ore. Puoi anche eseguire query utilizzando SQL standard.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi per le risorse utilizzate in questa pagina, segui questi passaggi.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per la guida rapida.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimina le singole risorse

Se vuoi mantenere il progetto Google Cloud utilizzato in questo walkthrough, elimina le singole risorse:

  1. Vai alla pagina Job Dataflow:
    Vai a Job
  2. Seleziona il job di flussi di dati dall'elenco dei job.
  3. Nella barra di navigazione, fai clic su Interrompi.
  4. Nella finestra di dialogo Arresta job, annulla o svuota la pipeline, quindi fai clic su Arresta job.
  5. Vai alla pagina BigQuery.
    Vai a BigQuery
  6. Nel riquadro Explorer, espandi il progetto.
  7. Accanto al set di dati che vuoi eliminare, fai clic su Visualizza azioni e poi su Apri.
  8. Nel riquadro dei dettagli, fai clic su Elimina set di dati e segui le istruzioni.
  9. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  10. Click the checkbox for the bucket that you want to delete.
  11. To delete the bucket, click Delete, and then follow the instructions.

Passaggi successivi