Creare una pipeline di streaming utilizzando un modello Dataflow

Questa guida rapida mostra come creare una pipeline di streaming utilizzando un modello Dataflow fornito da Google. Nello specifico, questa guida rapida utilizza il modello Da Pub/Sub a BigQuery come esempio.

Il modello Da Pub/Sub a BigQuery è una pipeline di streaming che può leggere i messaggi in formato JSON da un argomento Pub/Sub e scriverli in una tabella BigQuery.


Per seguire le indicazioni dettagliate per questa attività direttamente nella Google Cloud console, fai clic su Procedura guidata:

Procedura guidata


Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  8. Crea un bucket Cloud Storage:
    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets

    2. Click Create.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      1. For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      2. In the Choose where to store your data section, do the following:
        1. Select a Location type.
        2. Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
        3. To set up cross-bucket replication, select Add cross-bucket replication via Storage Transfer Service and follow these steps:

          Set up cross-bucket replication

          1. In the Bucket menu, select a bucket.
          2. In the Replication settings section, click Configure to configure settings for the replication job.

            The Configure cross-bucket replication pane appears.

            • To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
            • To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
            • Click Done.
      3. In the Choose how to store your data section, do the following:
        1. In the Set a default class section, select the following: Standard.
        2. To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
      4. In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
      5. In the Choose how to protect object data section, do the following:
        • Select any of the options under Data protection that you want to set for your bucket.
          • To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
          • To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
          • To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
            • To enable Object Retention Lock, click the Enable object retention checkbox.
            • To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
        • To choose how your object data will be encrypted, expand the Data encryption section (), and select a Data encryption method.
    4. Click Create.
  9. Copia quanto segue, perché ti servirà in una sezione successiva:
  10. Per completare i passaggi di questa guida rapida, il tuo account utente deve disporre del ruolo Amministratore Dataflow e del ruolo Service Account User. Il service account Compute Engine predefinito deve avere il ruolo Worker Dataflow, il ruolo Amministratore oggetti Storage, il ruolo Editor Pub/Sub, il ruolo Editor dati BigQuery e il ruolo Visualizzatore. Per aggiungere i ruoli richiesti nella console Google Cloud :

    1. Vai alla pagina IAM e seleziona il tuo progetto.
      Vai a IAM
    2. Nella riga contenente il tuo account utente, fai clic su Modifica entità. Fai clic su Aggiungi un altro ruolo e aggiungi i seguenti ruoli: Amministratore Dataflow e Utente service account.
    3. Fai clic su Salva.
    4. Nella riga contenente il service account predefinito di Compute Engine (PROJECT_NUMBER-compute@developer.gserviceaccount.com), fai clic su Modifica entità.
    5. Fai clic su Aggiungi un altro ruolo e aggiungi i seguenti ruoli: Worker Dataflow, Amministratore oggetti Storage, Editor Pub/Sub, Editor dati BigQuery, Visualizzatore.
    6. Fai clic su Salva.

      Per saperne di più sulla concessione dei ruoli, consulta Concedi un ruolo IAM utilizzando la console.

  11. Per impostazione predefinita, ogni nuovo progetto viene avviato con una rete predefinita. Se la rete predefinita per il tuo progetto è disattivata o è stata eliminata, devi disporre di una rete nel tuo progetto per la quale il tuo account utente dispone del ruolo Utente di rete Compute (roles/compute.networkUser).

Creare un set di dati e una tabella BigQuery

Crea un set di dati e una tabella BigQuery con lo schema appropriato per l'argomento Pub/Sub utilizzando la console Google Cloud .

In questo esempio, il nome del set di dati è taxirides e il nome della tabella è realtime. Per creare questo set di dati e questa tabella:

  1. Vai alla pagina BigQuery.
    Vai a BigQuery
  2. Nel riquadro Explorer, accanto al progetto in cui vuoi creare il set di dati, fai clic su Visualizza azioni, quindi fai clic su Crea set di dati.
  3. Nel riquadro Crea set di dati, segui questi passaggi:
    1. In ID set di dati, inserisci taxirides. Gli ID set di dati sono univoci per ogni Google Cloud progetto.
    2. Per Tipo di località, scegli Più regioni e poi Stati Uniti (più regioni negli Stati Uniti). I set di dati pubblici sono archiviati nella località multiregionale US. Per semplicità, inserisci il tuo set di dati nella stessa località.
    3. Lascia invariate le altre impostazioni predefinite e fai clic su Crea set di dati.
  4. Nel riquadro Explorer, espandi il progetto.
  5. Accanto al set di dati taxirides, fai clic su Visualizza azioni, quindi fai clic su Crea tabella.
  6. Nel riquadro Crea tabella, segui questi passaggi:
    1. Nella sezione Origine, per Crea tabella da, seleziona Tabella vuota.
    2. Nella sezione Destinazione, in Tabella, inserisci realtime.
    3. Nella sezione Schema, fai clic sull'opzione di attivazione/disattivazione Modifica come testo e incolla la seguente definizione di schema nel riquadro:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. Nella sezione Impostazioni di partizionamento e clustering, per Partizionamento, seleziona il campo timestamp.
  7. Lascia invariate le altre impostazioni predefinite e fai clic su Crea tabella.

esegui la pipeline.

Esegui una pipeline in modalità flusso utilizzando il modello Da Pub/Sub a BigQuery fornito da Google. La pipeline riceve i dati in entrata dall'argomento di input.

  1. Vai alla pagina Job Dataflow:
    Vai a Job
  2. Fai clic su Crea job da modello.
  3. Inserisci taxi-data come Nome job per il tuo job Dataflow.
  4. In Modello Dataflow, seleziona il modello Da Pub/Sub a BigQuery.
  5. In Tabella di output BigQuery, inserisci quanto segue:
    PROJECT_ID:taxirides.realtime

    Sostituisci PROJECT_ID con l'ID del progetto in cui hai creato il set di dati BigQuery.

  6. Nella sezione Parametri facoltativi dell'origine, fai clic su Inserisci argomento manualmente in Argomento di input Pub/Sub.
  7. Nella finestra di dialogo, in Nome argomento, inserisci quanto segue e quindi fai clic su Salva:
    projects/pubsub-public-data/topics/taxirides-realtime

    Questo argomento Pub/Sub disponibile pubblicamente si basa sul set di dati aperto della NYC Taxi & Limousine Commission. Di seguito è riportato un messaggio di esempio di questo argomento, in formato JSON:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  8. Per Località temporanea, inserisci quanto segue:
    gs://BUCKET_NAME/temp/

    Sostituisci BUCKET_NAME con il nome del tuo bucket Cloud Storage. La cartella temp archivia i file temporanei, ad esempio il job della pipeline di gestione temporanea.

  9. Se il tuo progetto non ha una rete predefinita, inserisci una rete e una subnet. Per saperne di più, consulta Specificare una rete e una subnet.
  10. Fai clic su Esegui job.

Visualizza i tuoi risultati

Per visualizzare i dati scritti nella tabella realtime:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Fai clic su Crea una nuova query. Si apre una nuova scheda Editor.

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Sostituisci PROJECT_ID con l'ID progetto del progetto in cui hai creato il set di dati BigQuery. La visualizzazione dei dati nella tabella può richiedere fino a cinque minuti.

  3. Fai clic su Esegui.

    La query restituisce le righe che sono state aggiunte alla tabella nelle ultime 24 ore. Puoi anche eseguire query utilizzando SQL standard.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto Google Cloud che hai creato per la guida rapida.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimina le singole risorse

Se vuoi conservare il progetto Google Cloud che hai utilizzato in questa guida rapida, elimina le singole risorse:

  1. Vai alla pagina Job Dataflow:
    Vai a Job
  2. Seleziona il tuo lavoro di streaming dall'elenco dei lavori.
  3. Nella navigazione, fai clic su Stop.
  4. Nella finestra di dialogo Arresta job, annulla o svuota la pipeline, quindi fai clic su Arresta job.
  5. Vai alla pagina BigQuery.
    Vai a BigQuery
  6. Nel riquadro Explorer, espandi il progetto.
  7. Accanto al set di dati che vuoi eliminare, fai clic su Visualizza azioni, quindi fai clic su Apri.
  8. Nel riquadro dei dettagli, fai clic su Elimina set di dati, quindi segui le istruzioni.
  9. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  10. Click the checkbox for the bucket that you want to delete.
  11. To delete the bucket, click Delete, and then follow the instructions.

Passaggi successivi