Elaborare un flusso di modifiche Bigtable

Questo tutorial mostra come eseguire il deployment di una pipeline di dati in Dataflow per un flusso in tempo reale delle modifiche al database provenienti dal flusso di modifiche di una tabella Bigtable. L'output della pipeline viene scritto in una serie di file su Cloud Storage.

Viene fornito un set di dati di esempio per un'applicazione di ascolto di musica. In questo tutorial, monitori i brani ascoltati e poi classifichi i primi cinque in un periodo di tempo.

Questo tutorial è rivolto agli utenti tecnici che hanno familiarità con la scrittura di codice e il deployment di pipeline di dati su Google Cloud.

Prepara l'ambiente

Ottieni il codice

Clona il repository che contiene il codice campione. Se hai già scaricato questo repository, esegui il pull per ottenere l'ultima versione.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Crea un bucket

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Crea un'istanza Bigtable

    Puoi utilizzare un'istanza esistente per questo tutorial o crearne una con le configurazioni predefinite in una regione vicina a te.

    Creare una tabella

    L'applicazione di esempio monitora i brani ascoltati dagli utenti e memorizza gli eventi di ascolto in Bigtable. Crea una tabella con uno stream di modifiche abilitato che abbia una famiglia di colonne (cf) e una colonna (song) e utilizzi gli ID utente per le chiavi di riga.

    Crea la tabella.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto che stai utilizzando
    • BIGTABLE_INSTANCE_ID: l'ID dell'istanza che conterrà la nuova tabella

    Avvia la pipeline

    Questa pipeline trasforma lo stream delle modifiche nel seguente modo:

    1. Legge il flusso di modifiche
    2. Recupera il titolo del brano
    3. Raggruppa gli eventi di ascolto dei brani in finestre di N secondi
    4. Conteggia i primi cinque brani
    5. Restituisce i risultati

    Esegui la pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Sostituisci BIGTABLE_REGION con l'ID della regione in cui si trova l'istanza Bigtable, ad esempio us-east5.

    Informazioni sulla pipeline

    I seguenti snippet di codice della pipeline possono aiutarti a comprendere il codice che stai eseguendo.

    Lettura del flusso di modifiche

    Il codice in questo esempio configura il flusso di origine con i parametri per l'istanza e la tabella Bigtable specifiche.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Recupero del titolo del brano

    Quando viene ascoltata una canzone, il nome viene scritto nella famiglia di colonne cf e nel qualificatore di colonna song, quindi il codice estrae il valore dalla mutazione dello stream delle modifiche e lo restituisce al passaggio successivo della pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Conteggio dei primi cinque brani

    Puoi utilizzare le funzioni Beam integrate Count e Top.of per ottenere le prime cinque canzoni nella finestra corrente.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Output dei risultati

    Questa pipeline scrive i risultati nell'output standard e nei file. Per i file, le scritture vengono suddivise in gruppi di 10 elementi o segmenti di un minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Visualizzare la pipeline

    1. Nella console Google Cloud , vai alla pagina Dataflow.

      Vai a Dataflow

    2. Fai clic sul job con un nome che inizia con song-rank.

    3. Nella parte inferiore dello schermo, fai clic su Mostra per aprire il pannello dei log.

    4. Fai clic su Log del worker per monitorare i log di output dello stream di modifiche.

    Operazioni di scrittura dei flussi

    Utilizza la CLI cbt per scrivere un numero di ascolti di brani per vari utenti nella tabella song-rank. Questi dati vengono scritti nell'arco di alcuni minuti per simulare gli ascolti in streaming dei brani nel tempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Visualizzare l'output

    Leggi l'output su Cloud Storage per visualizzare i brani più popolari.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Output di esempio:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]