Lee un flujo de cambios con Java
La biblioteca cliente de Cloud Bigtable para Java proporciona métodos de bajo nivel a fin de procesar registros de cambios de datos. Antes de leer este documento, lee la descripción general de los cambios de transmisiones.
Antes de comenzar
Cómo habilitar un flujo de cambios
Debes habilitar un flujo de cambios en una tabla antes de poder leerla. También puedes crear una tabla nueva con una transmisión de cambios habilitada.
Roles obligatorios
A fin de obtener los permisos que necesitas para leer un flujo de cambio de Bigtable, pídele a tu administrador que te otorgue el siguiente rol de IAM.
- Administrador de Bigtable (
roles/bigtable.admin
) en la instancia de Bigtable que contiene la tabla desde la que planeas transmitir los cambios
Cómo agregar la biblioteca cliente de Java como dependencia
Agrega un código similar al siguiente a tu archivo pom.xml
de Maven. Reemplaza VERSION
por la versión de la biblioteca cliente que usas. La versión debe ser 2.21.0 o una posterior.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Determina las particiones de la tabla
Para comenzar a realizar solicitudes ReadChangeStream
, debes conocer las particiones de la tabla. Esto se puede determinar mediante el método GenerateInitialChangeStreamPartitions
. En el siguiente ejemplo, se muestra cómo usar este método para obtener un flujo de ByteStringRanges
que representa cada partición en la tabla. Cada ByteStringRange
contiene las claves de inicio y finalización de una partición.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Procesar cambios en cada partición
Luego, puedes procesar cambios para cada partición con el método ReadChangeStream
. Este es un ejemplo de cómo abrir una transmisión para una partición, a partir de la hora actual.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
acepta los siguientes argumentos:
- Partición de transmisión (obligatorio): Es la partición desde la que se transmiten los cambios.
- Uno de los siguientes:
- Hora de inicio: Confirma la marca de tiempo desde la que se comienzan a procesar los cambios
- Tokens de continuación: Tokens que representan una posición desde la cual reanudar la transmisión
- Hora de finalización (opcional): Confirma la marca de tiempo para detener el procesamiento de cambios cuando se alcanza. Si no proporcionas un valor, la transmisión continúa leyendo.
- Duración de la señal de monitoreo de funcionamiento (opcional): Es la frecuencia de los mensajes de monitoreo de funcionamiento cuando no hay cambios nuevos (la configuración predeterminada es de cinco segundos).
Cómo cambiar el formato de los registros de transmisión
Un registro del flujo de cambios que se muestra es uno de los tres tipos de respuesta:
ChangeStreamMutation
: Es un mensaje que representa un registro de cambios de datos.CloseStream
: Es un mensaje que indica que el cliente debe dejar de leer desde la transmisión.- Estado: indica el motivo del cierre de la transmisión. Uno de los siguientes:
OK
: Se alcanzó la hora de finalización para la partición dada.OUT_OF_RANGE
: La partición dada ya no existe, lo que significa que se realizaron divisiones o combinaciones en esta partición. Se deberá crear una solicitudReadChangeStream
nueva para cada partición nueva.
NewPartitions
: Proporciona información de partición actualizada en las respuestasOUT_OF_RANGE
.ChangeStreamContinuationTokens
: Es la lista de tokens que se usan para reanudar nuevas solicitudesReadChangeStream
desde la misma posición. Uno porNewPartition
.
- Estado: indica el motivo del cierre de la transmisión. Uno de los siguientes:
Heartbeat
: Es un mensaje periódico con información que se puede usar para controlar el estado de la transmisión.EstimatedLowWatermark
: Estimación de la marca de agua baja para la partición determinadaContinuationToken
: Token para reanudar la transmisión de la partición dada desde la posición actual.
Contenido del registro de cambios de los datos
Cada registro de cambios de datos contiene lo siguiente:
- Entradas: cambios realizados en la fila, incluidos uno o más de los siguientes elementos:
- Escritura
- Familia de columnas
- Calificador de columna
- Marca de tiempo
- Valor
- Eliminación de celdas
- Familia de columnas
- Calificador de columna
- Rango de marca de tiempo
- Eliminación de una familia de columnas
- Familia de columnas
- Eliminación de una fila: La eliminación de una fila se convierte en una lista de eliminaciones de familias de columnas para cada familia de columnas en la que la fila tiene datos.
- Escritura
- Clave de fila: el identificador de la fila modificada
- Tipo de cambio: iniciado por el usuario o recolección de elementos no utilizados
- ID del clúster que recibió el cambio
- Marca de tiempo de confirmación: tiempo del servidor cuando se confirma el cambio en la tabla
- Desempate, un valor que permite que la aplicación que lee la transmisión use la política de resolución de conflictos integrada de Bigtable
- Token: lo usa la aplicación consumidora para reanudar la transmisión si se interrumpe
- Marca de agua baja estimada: el tiempo estimado desde que la partición del registro alcanzó la replicación en todos los clústeres. Para obtener más información, consulta Particiones y Marcas de agua.
Para obtener detalles adicionales sobre los campos de un registro de cambios de datos, consulta la referencia de la API de ReadChangeStream
.
Administra cambios en particiones
Cuando cambian las particiones de una tabla, las solicitudes ReadChangeStream
muestran un mensaje CloseStream
con la información necesaria para reanudar la transmisión desde las particiones nuevas.
En el caso de una división, esta contendrá varias particiones nuevas y un ContinuationToken
correspondiente para cada partición. Para reanudar la transmisión de particiones nuevas desde la misma posición, debes realizar una solicitud ReadChangeStream
nueva a cada partición nueva con su token correspondiente.
Por ejemplo, si transmites la partición [A,C)
y esta se divide en dos, [A,B)
y [B,C)
, puedes esperar la siguiente secuencia de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Para reanudar la transmisión de cada partición desde la misma posición, envía las siguientes solicitudes ReadChangeStreamQuery
:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Para que se realice una combinación, a fin de reanudar desde la misma partición, debes enviar una solicitud ReadChangeStream
nueva que contenga cada token de las particiones combinadas.
Por ejemplo, si transmites dos particiones, [A,B)
y [B,C)
, y se combinan en la partición [A,C)
, puedes esperar la siguiente secuencia de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Para reanudar la partición de transmisión [A, C)
desde la misma posición, debes enviar un ReadChangeStreamQuery
como el siguiente:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));
¿Qué sigue?
- Configura flujos de cambios.
- Usa el conector de Beam de Dataflow para las flujos de cambios de cambios.