Lee un flujo de cambios con Java

La biblioteca cliente de Cloud Bigtable para Java proporciona métodos de bajo nivel que permiten procesar registros de cambios en los datos. Sin embargo, en la mayoría de los casos, te recomendamos transmitir los cambios con Dataflow en lugar de usar los métodos descritos en esta página, ya que Dataflow controla las divisiones y combinaciones de partición por ti.

Antes de comenzar

Antes de leer un flujo de cambios con Java, asegúrate de familiarizarte con la descripción general de los flujos de cambios. Luego, completa los siguientes requisitos previos.

Configura la autenticación

Para usar las muestras de Java de esta página desde un entorno de desarrollo local, instala e inicializa la CLI de gcloud y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

  1. Instala Google Cloud CLI.
  2. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  3. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login

Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

Si deseas obtener información sobre cómo configurar la autenticación para un entorno de producción, consulta Configura las credenciales predeterminadas de la aplicación para el código que se ejecuta en Google Cloud..

Habilita un flujo de cambios

Debes habilitar un flujo de cambios en una tabla antes de poder leerla. También puedes crear una tabla nueva con un flujo de cambios habilitado.

Funciones obligatorias

A fin de obtener los permisos que necesitas para leer un flujo de cambios de Bigtable, pídele a tu administrador que te otorgue el siguiente rol de IAM.

  • Administrador de Bigtable (roles/bigtable.admin) en la instancia de Bigtable que contiene la tabla desde la que planeas transmitir cambios.

Cómo agregar la biblioteca cliente de Java como una dependencia

Agrega un código similar al siguiente a tu archivo pom.xml de Maven. Reemplaza VERSION por la versión de la biblioteca cliente que usas. La versión debe ser 2.21.0 o posterior.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Cómo determinar las particiones de la tabla

Para comenzar a realizar solicitudes ReadChangeStream, debes conocer las particiones de tu tabla. Esto se puede determinar con el método GenerateInitialChangeStreamPartitions. En el siguiente ejemplo, se muestra cómo usar este método para obtener un flujo de ByteStringRanges que representa cada partición en la tabla. Cada ByteStringRange contiene la clave de inicio y fin de una partición.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Procesa cambios para cada partición

Luego, puedes procesar los cambios de cada partición con el método ReadChangeStream. Este es un ejemplo de cómo abrir una transmisión para una partición, a partir de la hora actual.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery acepta los siguientes argumentos:

  • Partición de transmisión (obligatorio): La partición desde la que se transmitirán los cambios
  • Uno de los siguientes:
    • Hora de inicio: marca de tiempo de confirmación desde la que se comienza a procesar los cambios.
    • Tokens de continuación: tokens que representan una posición desde la cual reanudar la transmisión
  • Hora de finalización (opcional): Marca de tiempo de confirmación para detener el procesamiento de cambios cuando se alcancen. Si no proporcionas un valor, la transmisión continúa leyendo.
  • Duración de la señal de monitoreo de funcionamiento (opcional): Frecuencia de los mensajes de señal de monitoreo de funcionamiento cuando no hay cambios nuevos (la configuración predeterminada es de cinco segundos)

Cambia el formato de registro de la transmisión

Un registro de flujos de cambios que se muestra es uno de los siguientes tres tipos de respuesta:

  • ChangeStreamMutation: Es un mensaje que representa un registro de cambios en los datos.

  • CloseStream: Es un mensaje que indica que el cliente debe dejar de leer desde la transmisión.

    • Estado: indica el motivo por el que se cierra la transmisión. Uno de los siguientes:
      • OK: Se alcanzó la hora de finalización para la partición determinada.
      • OUT_OF_RANGE: La partición dada ya no existe, lo que significa que se realizaron divisiones o combinaciones en esta partición. Se deberá crear una solicitud ReadChangeStream nueva para cada partición nueva.
    • NewPartitions: Brinda información actualizada de partición sobre las respuestas de OUT_OF_RANGE.
    • ChangeStreamContinuationTokens: Lista de tokens que se usan para reanudar solicitudes ReadChangeStream nuevas desde la misma posición. Una por NewPartition.
  • Heartbeat: Es un mensaje periódico con información que se puede usar para controlar el estado de la transmisión.

    • EstimatedLowWatermark: Estimación de la marca de agua baja de la partición determinada
    • ContinuationToken: Token para reanudar la transmisión de la partición determinada desde la posición actual.

Contenido del registro de cambios de datos

Cada registro de cambios en los datos contiene lo siguiente:

  • Entradas: cambios realizados en la fila, incluidos uno o más de los siguientes elementos:
    • Escritura
      • Familia de columnas
      • Calificador de columna
      • Marca de tiempo
      • Valor
    • Eliminación de celdas
      • Familia de columnas
      • Calificador de columna
      • Rango de marca de tiempo
    • La eliminación de una familia de columnas
      • Familia de columnas
      • Eliminación de una fila: La eliminación de una fila se convierte en una lista de eliminaciones de familias de columnas para cada familia de columnas en la que la fila tiene datos.
  • Clave de fila: el identificador de la fila modificada
  • Tipo de cambio: iniciado por el usuario o recolección de elementos no utilizados
  • ID del clúster que recibió el cambio
  • Marca de tiempo de confirmación: la hora del servidor cuando se confirmó el cambio en la tabla
  • Disyuntor: un valor que permite que la aplicación que lee la transmisión use la política de resolución de conflictos integrada de Bigtable
  • Token: La aplicación consumidora lo usa para reanudar la transmisión en caso de que se interrumpa.
  • Marca de agua estimada baja: El tiempo estimado desde que la partición del registro alcanzó la replicación en todos los clústeres Para obtener más información, consulta Particiones y Marcas de agua.

Para obtener detalles adicionales sobre los campos en un registro de cambios de datos, consulta la referencia de la API para ReadChangeStream.

Cómo controlar los cambios en las particiones

Cuando las particiones de una tabla cambian, las solicitudes ReadChangeStream muestran un mensaje CloseStream con la información necesaria para reanudar la transmisión de las particiones nuevas.

En una división, esta contendrá varias particiones nuevas y un ContinuationToken correspondiente para cada una. Para reanudar la transmisión de las particiones nuevas desde la misma posición, realiza una solicitud ReadChangeStream nueva para cada partición nueva con su token correspondiente.

Por ejemplo, si transmites la partición [A,C) y esta se divide en dos particiones, [A,B) y [B,C), puedes esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Para reanudar la transmisión de cada partición desde la misma posición, envía las siguientes solicitudes ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Para que una combinación se reanude desde la misma partición, debes enviar una nueva solicitud ReadChangeStream que contenga cada token de las particiones combinadas.

Por ejemplo, si transmites dos particiones, [A,B) y [B,C), y se fusionan en la partición [A,C), puedes esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Para reanudar la partición de transmisión [A, C) desde la misma posición, envía una ReadChangeStreamQuery como el siguiente:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

¿Qué sigue?