Elaborare un flusso di modifiche Bigtable


Questo tutorial mostra come eseguire il deployment di una pipeline di dati in Dataflow per un un flusso in tempo reale di modifiche al database provenienti dal database di modifiche in tempo reale. L'output della pipeline viene scritto in una serie di file di archiviazione ideale in Cloud Storage.

Viene fornito un set di dati di esempio per un'applicazione di ascolto di musica. In questo di questo tutorial ti permette di tenere traccia dei brani che hai ascoltato e di classificarti tra i primi cinque punto.

Questo tutorial è rivolto a utenti tecnici che hanno dimestichezza con la scrittura di codice e con il deployment di pipeline di dati su Google Cloud.

Obiettivi

Questo tutorial illustra come:

  • Crea una tabella Bigtable con un flusso di modifiche abilitato.
  • Esegui il deployment di una pipeline su Dataflow che trasformi e generi l'output del flusso di modifiche.
  • Visualizzare i risultati della pipeline di dati.

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Una volta completate le attività descritte in questo documento, puoi evitare la fatturazione continua eliminando le risorse che hai creato. Per ulteriori informazioni, consulta la pagina Pulizia.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. Aggiorna e installa il Interfaccia a riga di comando cbt di Google.
    gcloud components update
    gcloud components install cbt

Prepara l'ambiente

Ottieni il codice

Clonare il repository che contiene il codice campione. Se lo hai già scaricato in precedenza repository ed eseguire il pull per ottenere la versione più recente.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Crea un bucket

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.
  • Crea un'istanza Bigtable

    Per questo tutorial puoi utilizzare un'istanza esistente o crearne una con le configurazioni predefinite in una regione vicino a te.

    Creare una tabella

    L'applicazione di esempio monitora i brani ascoltati dagli utenti e memorizza gli eventi di ascolto in Bigtable. Crea una tabella con un flusso di modifiche abilitato che ha una famiglia di colonne (cf) e una colonna (brano) e utilizza gli ID utente per le chiavi di riga.

    Crea la tabella.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto in uso
    • BIGTABLE_INSTANCE_ID: l'ID dell'istanza che dovrà contenere la nuova tabella

    Avvia la pipeline

    Questa pipeline trasforma il flusso di modifiche procedendo nel seguente modo:

    1. Legge il flusso di modifiche
    2. Recupera il nome del brano
    3. Raggruppa gli eventi di ascolto dei brani in finestre di n secondi
    4. Conta i cinque brani più ascoltati
    5. Restituisce i risultati

    Esegui la pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Sostituisci BIGTABLE_REGION con l'ID della regione in cui si trova l'istanza Bigtable, ad esempio us-east5.

    Informazioni sulla pipeline

    I seguenti snippet di codice della pipeline possono aiutarti a comprendere il codice in esecuzione.

    Lettura del flusso di modifiche

    Il codice in questo esempio configura lo stream di origine con i parametri per il valore di un'istanza e una tabella Bigtable specifiche.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Recupero del titolo del brano

    Quando un brano viene ascoltato, il nome del brano viene scritto nella famiglia di colonne cf e il qualificatore di colonna song, in modo che il codice estragga il valore dalla modifica la mutazione del flusso e la invia al passaggio successivo della pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Conteggio dei cinque brani più ascoltati

    Puoi utilizzare le funzioni Beam integrate Count e Top.of per visualizzare le prime cinque brani nella finestra corrente.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Output dei risultati

    Questa pipeline scrive i risultati sia in output standard che in file. Per , le scritture vengono suddivise in gruppi di 10 elementi o segmenti di un minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Visualizza la pipeline

    1. Nella console Google Cloud, vai alla pagina Dataflow.

      Vai a Dataflow

    2. Fai clic sul job il cui nome inizia con song-rank.

    3. Nella parte inferiore dello schermo, fai clic su Mostra per aprire il riquadro dei log.

    4. Fai clic su Log worker per monitorare i log di output del flusso di modifiche.

    Operazioni di scrittura flussi

    Utilizza la Interfaccia a riga di comando cbt di scrivere una serie di brani ascoltati da vari utenti la tabella song-rank. È progettata per scrivere nell'arco di pochi minuti per simulare ascolta brani in streaming nel tempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Visualizza l'output

    Leggi l'output su Cloud Storage per vedere i brani più popolari.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Output di esempio:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Esegui la pulizia

    Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

    Elimina il progetto

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Elimina singole risorse

    1. Elimina il bucket e i file.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Disattiva il flusso di modifiche nella tabella.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Elimina la tabella song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Arresta la pipeline di modifiche in tempo reale.

      1. Elenca i job per ottenere l'ID job.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Annullare il job.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Sostituisci JOB_ID con l'ID job visualizzato dopo il comando precedente.

    Passaggi successivi