Elabora le modifiche in tempo reale di Bigtable


Questo tutorial mostra come eseguire il deployment di una pipeline di dati in Dataflow per un flusso in tempo reale di modifiche al database provenienti dal flusso di modifiche di una tabella Bigtable. L'output della pipeline viene scritto in una serie di file in Cloud Storage.

Viene fornito un set di dati di esempio per un'applicazione di ascolto di musica. In questo tutorial, traccerai i brani ascoltati e classificherai i primi cinque in un determinato periodo di tempo.

Questo tutorial è rivolto agli utenti tecnici che hanno familiarità con la scrittura di codice e il deployment di pipeline di dati in Google Cloud.

Obiettivi

Questo tutorial ti mostra come effettuare le seguenti operazioni:

  • Crea una tabella Bigtable con un flusso di modifiche abilitato.
  • Esegui il deployment su Dataflow di una pipeline che trasforma e restituisce il flusso di modifiche.
  • Visualizza i risultati della pipeline di dati.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono essere idonei a una prova senza costi aggiuntivi.

Una volta completate le attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la pagina Pulizia.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  4. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  5. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  6. Abilita le API Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. Installa Google Cloud CLI.
  8. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  9. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID

      Sostituisci PROJECT_ID con un nome per il progetto Google Cloud che stai creando.

    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID

      Sostituisci PROJECT_ID con il nome del tuo progetto Google Cloud.

  10. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  11. Abilita le API Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. Aggiorna e installa l'interfaccia a riga di comando cbt.
    gcloud components update
    gcloud components install cbt
    

prepara l'ambiente

Ottieni il codice

clona il repository che contiene il codice campione. Se hai già scaricato questo repository, esegui il pull per ottenere la versione più recente.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

crea un bucket

  • Crea un bucket Cloud Storage:
    gcloud storage buckets create gs://BUCKET_NAME
    Sostituisci BUCKET_NAME con un nome di bucket che soddisfi i requisiti di denominazione dei bucket.
  • Crea un'istanza Bigtable

    Puoi utilizzare un'istanza esistente per questo tutorial o creare un'istanza con le configurazioni predefinite in una regione nella tua zona.

    Creare una tabella

    L'applicazione di esempio tiene traccia dei brani ascoltati dagli utenti e archivia gli eventi di ascolto in Bigtable. Crea una tabella con una modifica in tempo reale attivata che abbia una famiglia di colonne (cf) e una colonna (brano) e che utilizzi gli ID utente per le chiavi di riga.

    Crea la tabella.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto in uso
    • BIGTABLE_INSTANCE_ID: l'ID dell'istanza che conterrà la nuova tabella

    Avvia la pipeline

    Questa pipeline trasforma il flusso di modifiche effettuando quanto segue:

    1. Legge le modifiche in tempo reale
    2. Recupera il nome del brano
    3. Raggruppa gli eventi di ascolto dei brani in finestre di N secondi
    4. Conta i 5 brani più ascoltati
    5. Restituisce come output i risultati

    Eseguire la pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Sostituisci BIGTABLE_REGION con l'ID della regione in cui si trova l'istanza Bigtable, ad esempio us-east5.

    Comprendere la pipeline

    I seguenti snippet di codice della pipeline possono aiutarti a comprendere il codice in esecuzione.

    Lettura del flusso di modifiche

    Il codice in questo esempio configura il flusso di origine con i parametri per l'istanza e la tabella Bigtable specifiche.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Recupero del nome del brano in corso...

    Quando una canzone viene ascoltata, il suo nome viene scritto nella famiglia di colonne cf e nel qualificatore di colonna song, quindi il codice estrae il valore dalla modifica delle modifiche in tempo reale e lo invia al passaggio successivo della pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Conta i cinque brani più ascoltati

    Puoi usare le funzioni Beam Count e Top.of integrate per visualizzare i cinque brani più ascoltati nella finestra corrente.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Output dei risultati

    Questa pipeline scrive i risultati sia nel formato standard sia nei file. Per i file, imposta le scritture in gruppi di 10 elementi o segmenti di un minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Visualizza la pipeline

    1. Nella console Google Cloud, vai alla pagina Dataflow.

      Vai a Dataflow

    2. Fai clic sul lavoro con un nome che inizia con song-ranking.

    3. Nella parte inferiore dello schermo, fai clic su Mostra per aprire il riquadro dei log.

    4. Fai clic su Log worker per monitorare i log di output del flusso di modifiche.

    Operazioni di scrittura flussi

    Utilizza l'interfaccia a riga di comando cbt per scrivere un numero di ascolti di brani per vari utenti nella tabella song-rank. È progettato per scrivere per pochi minuti per simulare l'ascolto di brani in streaming nel tempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Visualizza l'output

    Leggi l'output su Cloud Storage per vedere i brani più popolari.

    gsutil cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Output di esempio:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Esegui la pulizia

    Per evitare che al tuo Account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

    Elimina il progetto

      Elimina un progetto Google Cloud:

      gcloud projects delete PROJECT_ID

    Elimina singole risorse

    1. Elimina il bucket e i file.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Disabilita il flusso di modifiche nella tabella.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Elimina la tabella song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Arresta la pipeline delle modifiche in tempo reale.

      1. Elenca i job per ottenere l'ID job.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Annulla il job.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Sostituisci JOB_ID con l'ID job visualizzato dopo il comando precedente.

    Passaggi successivi