Procesa un flujo de cambios de Bigtable


En este instructivo, se muestra cómo implementar una canalización de datos en Dataflow para un flujo en tiempo real de cambios en la base de datos procedentes de los datos de una tabla de Bigtable flujo de cambios. El resultado de la canalización se escribe en una serie de archivos en en Google Cloud Storage.

Se proporciona un conjunto de datos de ejemplo para una aplicación de escucha de música. En este de audio, realizas un seguimiento de las canciones que se escuchan y, luego, clasificas las cinco primeras en un período.

Este instructivo está dirigido a usuarios técnicos familiarizados con la escritura de código y para implementar canalizaciones de datos en Google Cloud.

Objetivos

En este instructivo, se muestra cómo hacer lo siguiente:

  • Crear una tabla de Bigtable con un flujo de cambios habilitado
  • Implementar una canalización en Dataflow que transforme y muestre el resultado flujo de cambios.
  • Visualiza los resultados de tu canalización de datos.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.
  3. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  6. Habilita las APIs de Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. Instala Google Cloud CLI.
  8. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  11. Habilita las APIs de Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. Actualiza e instala el CLI de cbt .
    gcloud components update
    gcloud components install cbt
    

Prepare el entorno

Cómo obtener el código

Clona el repositorio que contiene el código de muestra. Si ya descargaste esto anteriormente de destino, extrae para obtener la versión más reciente.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Crea un bucket

  • Crear un bucket de Cloud Storage:
    gcloud storage buckets create gs://BUCKET_NAME
    Reemplaza BUCKET_NAME por un nombre de bucket que cumpla con los requisitos de nombre del bucket:
  • Crear una instancia de Bigtable.

    Puedes usar una instancia existente para este instructivo o crear una instancia con la configuración predeterminada en una región cercana.

    Crea una tabla

    La aplicación de ejemplo realiza un seguimiento de las canciones que los usuarios escuchan y almacena las escuchar eventos en Bigtable. Crea una tabla con un flujo de cambios habilitado con una familia de columnas (cf) y una columna (song), y que utiliza los ID de usuario para las claves de fila.

    Crea la tabla.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID del proyecto que estás usando.
    • BIGTABLE_INSTANCE_ID: Es el ID de la instancia que contendrá la tabla nueva.

    Comienza la canalización

    Esta canalización transforma el flujo de cambios de la siguiente manera:

    1. Lee el flujo de cambios
    2. Obtiene el nombre de la canción
    3. Agrupa los eventos de reproducción de la canción en ventanas de N segundos.
    4. Cuenta las cinco canciones más populares
    5. Muestra los resultados

    Ejecutar la canalización

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Reemplaza BIGTABLE_REGION por el ID de la región en la que se encuentra tu instancia de Bigtable, como us-east5.

    Comprende la canalización

    Los siguientes fragmentos de código de la canalización pueden ayudarte a comprender la el código que ejecutas.

    Cómo leer el flujo de cambios

    El código de este ejemplo configura la transmisión de origen con los parámetros del una instancia y una tabla de Bigtable específicas.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Obteniendo el nombre de la canción

    Cuando se escucha una canción, el nombre de la canción se escribe en la familia de columnas cf y el calificador de columna song, de modo que el código extraiga el valor del cambio de transmisión y la lleva al siguiente paso de la canalización.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    El recuento de las cinco canciones más populares

    Puedes usar las funciones integradas de Beam Count y Top.of para obtener las cinco canciones en la ventana actual.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Cómo mostrar los resultados

    Esta canalización escribe los resultados en la salida estándar y en los archivos. Para el las operaciones de escritura en grupos de 10 elementos o en segmentos de un minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Visualiza la canalización

    1. En la consola de Google Cloud, ve a la página Dataflow.

      Ir a Dataflow

    2. Haz clic en el trabajo que tenga un nombre que comience con song-rank.

    3. En la parte inferior de la pantalla, haz clic en Mostrar para abrir el panel de registros.

    4. Haz clic en Registros de trabajador para supervisar los registros de salida del flujo de cambios.

    Escrituras de transmisión

    Usa el CLI de cbt escribir una serie de canciones para que diversos usuarios puedan escuchar la tabla song-rank. Se diseñó para escribir en unos minutos para simular la canción que se está reproduciendo a lo largo del tiempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Revise el resultado.

    Lee el resultado en Cloud Storage para ver las canciones más populares.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Salida de ejemplo:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Limpia

    Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

    Borra el proyecto

      Borra un proyecto de Google Cloud:

      gcloud projects delete PROJECT_ID

    Borra los recursos individuales

    1. Borra el bucket y los archivos.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Inhabilita el flujo de cambios en la tabla.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Borra la tabla song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Detén la canalización del flujo de cambios.

      1. Enumera los trabajos para obtener el ID del trabajo.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Cancela el trabajo.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Reemplaza JOB_ID por el ID de trabajo que se muestra después del comando anterior.

    ¿Qué sigue?