Cómo leer un flujo de cambios con Java

La biblioteca cliente de Cloud Bigtable para Java proporciona métodos de bajo nivel a fin de procesar registros de cambios en los datos. Sin embargo, en la mayoría de los casos, recomendamos que transmitas los cambios con Dataflow en lugar de usar los métodos descritos en esta página, ya que Dataflow se encarga de las divisiones y combinaciones de partición por ti.

Antes de comenzar

Antes de leer un flujo de cambios con Java, asegúrate de estar familiarizado con la descripción general de los flujos de cambios. Luego, completa los siguientes requisitos previos.

Configura la autenticación

Para usar las muestras de Java de esta página en un entorno de desarrollo local, instala e inicializa gcloud CLI y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

  1. Instala Google Cloud CLI.
  2. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  3. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login

Para obtener más información, consulta Set up authentication for a local development environment.

Si quieres obtener información sobre cómo configurar la autenticación para un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.

Cómo habilitar un flujo de cambios

Debes habilitar un flujo de cambios en una tabla antes de poder leerla. También puedes crear una tabla nueva con un flujo de cambios habilitado.

Roles obligatorios

A fin de obtener los permisos que necesitas para leer un flujo de cambios de Bigtable, pídele a tu administrador que te otorgue el siguiente rol de IAM.

  • Administrador de Bigtable (roles/bigtable.admin) en la instancia de Bigtable que contiene la tabla desde la que planeas transmitir los cambios.

Agrega la biblioteca cliente de Java como dependencia

Agrega un código similar al siguiente al archivo pom.xml de Maven. Reemplaza VERSION por la versión de la biblioteca cliente que usas. La versión debe ser 2.21.0 o posterior.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determina las particiones de la tabla

Para comenzar a realizar solicitudes ReadChangeStream, debes conocer las particiones de tu tabla. Esto se puede determinar con el método GenerateInitialChangeStreamPartitions. En el siguiente ejemplo, se muestra cómo usar este método para obtener un flujo de ByteStringRanges que representa cada partición de la tabla. Cada ByteStringRange contiene la clave de inicio y de finalización de una partición.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Cambios del proceso para cada partición

Luego, puedes procesar cambios para cada partición con el método ReadChangeStream. Este es un ejemplo de cómo abrir una transmisión para una partición a partir de la hora actual.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery acepta los siguientes argumentos:

  • Partición de transmisión (obligatorio): La partición desde la que se transmiten los cambios
  • Uno de los siguientes:
    • Hora de inicio: Confirma una marca de tiempo desde la que se deben comenzar a procesar los cambios
    • Tokens de continuación: tokens que representan una posición desde la que se puede reanudar la transmisión.
  • Hora de finalización (opcional): Confirma una marca de tiempo para detener el procesamiento de los cambios cuando se alcancen. Si no proporcionas un valor, la transmisión continúa leyendo.
  • Duración de la señal de monitoreo de funcionamiento (opcional): Es la frecuencia de los mensajes de señal de monitoreo de funcionamiento cuando no hay cambios nuevos (el valor predeterminado es de cinco segundos).

Cambia el formato del registro de la transmisión

Un registro de flujos de cambios que se muestra es uno de los tres tipos de respuesta siguientes:

  • ChangeStreamMutation: Es un mensaje que representa un registro de cambios en los datos.

  • CloseStream: Es un mensaje que indica que el cliente debe dejar de leer desde la transmisión.

    • Estado: Indica el motivo por el que se cierra la transmisión. Uno de los siguientes:
      • OK: Se alcanzó la hora de finalización para la partición especificada.
      • OUT_OF_RANGE: La partición determinada ya no existe, lo que significa que se realizaron divisiones o combinaciones en esta partición. Se deberá crear una solicitud ReadChangeStream nueva para cada partición nueva.
    • NewPartitions: Proporciona la información de partición actualizada sobre las respuestas de OUT_OF_RANGE.
    • ChangeStreamContinuationTokens: Es la lista de tokens que se usan para reanudar las solicitudes nuevas de ReadChangeStream desde la misma posición. Uno por NewPartition.
  • Heartbeat: Es un mensaje periódico con información que se puede usar para controlar el estado de la transmisión.

    • EstimatedLowWatermark: Estimación de la marca de agua baja de la partición determinada
    • ContinuationToken: Es el token para reanudar la transmisión de la partición determinada desde la posición actual.

Contenido del registro de cambios de datos

Para obtener información sobre los registros de flujos de cambios, consulta Qué hay en un registro de cambios de datos.

Cómo controlar cambios en particiones

Cuando las particiones de una tabla cambian, las solicitudes ReadChangeStream muestran un mensaje CloseStream con la información necesaria para reanudar la transmisión desde las particiones nuevas.

En una división, esto contendrá varias particiones nuevas y un ContinuationToken correspondiente para cada partición. Para reanudar la transmisión de las particiones nuevas desde la misma posición, debes realizar una solicitud ReadChangeStream nueva para cada partición nueva con su token correspondiente.

Por ejemplo, si transmites la partición [A,C) y esta se divide en dos particiones, [A,B) y [B,C), puedes esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Para reanudar la transmisión de cada partición desde la misma posición, envía las siguientes solicitudes ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

En el caso de una combinación, para reanudar desde la misma partición, debes enviar una solicitud ReadChangeStream nueva que contenga cada token de las particiones combinadas.

Por ejemplo, si transmites dos particiones, [A,B) y [B,C), que se combinan en la partición [A,C), puedes esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Para reanudar la partición de transmisión [A, C) desde la misma posición, envía un ReadChangeStreamQuery como el siguiente:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

¿Qué sigue?