Lee un flujo de cambios con Java
La biblioteca cliente de Cloud Bigtable para Java proporciona métodos de bajo nivel para procesar registros de cambios de datos. Sin embargo, en la mayoría de los casos, te recomendamos que transmitas los cambios con Dataflow en lugar de usar los métodos que se describen en esta página, ya que Dataflow controla las divisiones y las combinaciones de particiones por ti.
Antes de comenzar
Antes de leer un flujo de cambios con Java, asegúrate de estar familiarizado con la descripción general de los flujos de cambios. Luego, completa los siguientes requisitos previos.
Configura la autenticación
Para usar las muestras de Java de esta página en un entorno de desarrollo local, instala e inicializa gcloud CLI y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
Para obtener más información, consulta Set up authentication for a local development environment.
Si quieres obtener información sobre cómo configurar la autenticación para un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.
Habilita un flujo de cambios
Debes habilitar un flujo de cambios en una tabla para poder leerla. También puedes crear una tabla nueva con un flujo de cambios habilitado.
Roles obligatorios
Para obtener los permisos que necesitas para leer un flujo de cambios de Bigtable, pídele a tu administrador que te otorgue el siguiente rol de IAM.
- Administrador de Bigtable (
roles/bigtable.admin
) en la instancia de Bigtable que contiene la tabla desde la que deseas transmitir cambios
Agrega la biblioteca cliente de Java como una dependencia
Agrega código similar al siguiente a tu archivo pom.xml
de Maven. Reemplaza VERSION
por la versión de la biblioteca cliente que usas. La versión debe ser 2.21.0 o posterior.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Determina las particiones de la tabla
Para comenzar a realizar solicitudes ReadChangeStream
, debes conocer las particiones de tu tabla. Esto se puede determinar con el método GenerateInitialChangeStreamPartitions
. En el siguiente ejemplo, se muestra cómo
usar este método para obtener un flujo de
ByteStringRanges
que representa cada partición de la tabla. Cada ByteStringRange
contiene la clave de inicio y finalización de una partición.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Procesa los cambios de cada partición
Luego, puedes procesar los cambios de cada partición con el método ReadChangeStream
. Este es un ejemplo de cómo abrir un flujo para una partición, a partir de la hora actual.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
acepta los siguientes argumentos:
- Partición de transmisión (obligatoria): Es la partición desde la que se transmitirán los cambios.
- Uno de los siguientes:
- Hora de inicio: Es la marca de tiempo de confirmación para comenzar a procesar los cambios a partir de ella.
- Tokens de Continuation: Son tokens que representan una posición desde la que se reanuda la transmisión.
- Hora de finalización (opcional): Es la marca de tiempo de confirmación para detener el procesamiento de cambios cuando se alcanza. Si no proporcionas un valor, la transmisión seguirá leyendo.
- Duración del intervalo de tiempo de actividad (opcional): Es la frecuencia de los mensajes de intervalo de tiempo de actividad cuando no hay cambios nuevos (el valor predeterminado es de cinco segundos).
Cambia el formato de registro de flujo
Un registro de flujo de cambios que se muestra es uno de los tres tipos de respuesta:
ChangeStreamMutation
: Es un mensaje que representa un registro de cambios de datos.CloseStream
: Es un mensaje que indica que el cliente debe dejar de leer de la transmisión.- Estado: Indica el motivo por el que se cerró la transmisión. Uno de los siguientes:
OK
: Se alcanzó la hora de finalización de la partición determinada.OUT_OF_RANGE
: La partición determinada ya no existe, lo que significa que se produjeron divisiones o combinaciones en esta partición. Se deberá crear una solicitudReadChangeStream
nueva para cada partición nueva.
NewPartitions
: Proporciona la información de partición actualizada en las respuestas deOUT_OF_RANGE
.ChangeStreamContinuationTokens
: Es la lista de tokens que se usan para reanudar solicitudesReadChangeStream
nuevas desde la misma posición. Uno porNewPartition
.
- Estado: Indica el motivo por el que se cerró la transmisión. Uno de los siguientes:
Heartbeat
: Es un mensaje periódico con información que se puede usar para verificar el estado del flujo.EstimatedLowWatermark
: Estimación de la marca de agua baja para la partición determinadaContinuationToken
: Es un token para reanudar la transmisión de la partición determinada desde la posición actual.
Contenido de los registros de cambios de datos
Para obtener información sobre los registros de flujo de cambios, consulta Qué contiene un registro de cambios de datos.
Controla los cambios en las particiones
Cuando cambian las particiones de una tabla, las solicitudes de ReadChangeStream
muestran un mensaje CloseStream
con la información necesaria para reanudar la transmisión desde las particiones nuevas.
Para una división, contendrá varias particiones nuevas y un ContinuationToken
correspondiente para cada una. Para reanudar la transmisión de las particiones nuevas desde la misma posición, debes realizar una nueva solicitud ReadChangeStream
para cada partición nueva con su token correspondiente.
Por ejemplo, si transmites la partición [A,C)
y se divide en dos particiones, [A,B)
y [B,C)
, puedes esperar la siguiente secuencia de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Para reanudar la transmisión de cada partición desde la misma posición, envía las siguientes solicitudes ReadChangeStreamQuery
:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Para una combinación, para reanudar desde la misma partición, debes enviar una nueva
solicitud ReadChangeStream
que contenga cada token de las particiones combinadas.
Por ejemplo, si transmites dos particiones, [A,B)
y [B,C)
, y se combinan en la partición [A,C)
, puedes esperar la siguiente secuencia de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Para reanudar la transmisión de la partición [A, C)
desde la misma posición, envía un ReadChangeStreamQuery
como el siguiente:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));