Transmite cambios con Dataflow

El conector de Bigtable Beam te permite usar Dataflow para leer registros de cambios de datos de Bigtable sin necesidad de rastrear partición de proceso cambios en las código, ya que el conector maneja esa lógica por ti.

En este documento, se describe cómo configurar y usar el conector de Bigtable Beam para leer un flujo de cambios con una canalización de Dataflow. Antes de leer este documento, debes leer la Descripción general de los cambios y familiarizarse con Dataflow.

Alternativas a compilar tu propia canalización

Si no quieres crear tu propio Dataflow por lotes, puedes usar una de las siguientes opciones.

Puedes usar una plantilla de Dataflow proporcionada por Google.

También puedes usar las muestras de código del instructivo de Bigtable o como punto de partida para tu código.

Asegúrate de que el código que generes utilice google cloud libraries-bom 26.14.0 o una versión posterior

Detalles del conector

El método del conector de Bigtable Beam, BigtableIO.readChangeStream, te permite leer una transmisión de datos registros de cambios (ChangeStreamMutation) que puedes procesar. El conector de Bigtable Beam es un componente de la instancia de GitHub de Apache Beam Cloud Storage. Para obtener una descripción del código del conector, consulta los comentarios en BigtableIO.java

Debes usar el conector con Beam 2.48.0 o una versión posterior. Consulta Apache Beam tiempo de ejecución para asegurarte de que estás usando una versión compatible de Java. Luego, puedes implementar una canalización usa el conector para Dataflow, que controla el aprovisionamiento y la administración de recursos y ayuda con la escalabilidad y confiabilidad de los datos de transmisión de datos.

Para obtener más información sobre el modelo de programación de Apache Beam, consulta la Documentación de Beam.

Agrupa datos sin horarios de evento

Los registros de cambios de datos transmitidos con el conector de Bigtable Beam no son compatibles con Dataflow. que dependen del tiempo del evento.

Como se explica en Replicación y marcas de agua, Es posible que la marca de agua baja no avance si no se detecta la replicación de la partición. al resto de la instancia. Cuando una marca de agua baja deja de avanzar, puede el flujo de cambios se paraliza.

Para evitar que se detenga la transmisión, el conector de Bigtable Beam genera todos los datos con una marca de tiempo de salida de cero. La marca de tiempo de cero hace que Dataflow considere todos los datos cambiar registros para que sean datos tardíos. Como resultado, las funciones de Dataflow que dependen de la hora del evento no compatibles con los flujos de cambios de Bigtable. En concreto, no puedes usar funciones analíticas, activadores de tiempo de evento, o temporizadores de tiempo de evento

En cambio, puedes usar GlobalWindows con activadores que no son de tiempo de eventos para agrupar estos datos tardíos en paneles, como se demostró en el ejemplo del instructivo. Para obtener detalles sobre activadores y paneles, consulta Activadores en la guía de programación de Beam.

Ajuste de escala automático

El conector admite Ajuste de escala automático de Dataflow, que se habilita de forma predeterminada cuando se usan Runner v2 (obligatorio). El algoritmo de ajuste de escala automático de Dataflow tiene en cuenta el trabajo pendiente del flujo de cambios estimado, que se puede supervisar Supervisión de Dataflow de la sección Backlog. Usa la marca --maxNumWorkers cuando implementes un para limitar el número de trabajadores.

Para escalar tu canalización de forma manual en lugar de usar el ajuste de escala automático, consulta Escala de forma manual una canalización de transmisión.

Limitaciones

Ten en cuenta las siguientes limitaciones antes de usar el conector de Bigtable Beam con Dataflow.

Ejecutor de Dataflow V2

El conector solo se puede ejecutar usando Dataflow Runner v2. Para habilitar esto, especifica --experiments=use_runner_v2 en tu línea de comandos argumentos. Ejecutar con Runner v1 hace que tu canalización falle con el siguiente excepción:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Instantáneas

El conector no es compatible Instantáneas de Dataflow.

Duplicados

El conector de Bigtable Beam transmite los cambios para cada clave de fila y cada clúster en orden de marca de tiempo de confirmación, pero como a veces se reinicia desde en momentos más tempranos en la transmisión, puede producir duplicados.

Antes de comenzar

Antes de usar el conector, completa los siguientes requisitos previos.

Configura la autenticación

Para usar las muestras de Java de esta página en un entorno de desarrollo local, instala e inicializa gcloud CLI y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Para obtener más información, consulta Set up authentication for a local development environment.

Si quieres obtener información sobre cómo configurar la autenticación para un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.

Cómo habilitar un flujo de cambios

Debes Cómo habilitar un flujo de cambios sobre una mesa antes de que puedas leerlos. También puedes crear una tabla nueva con flujos de cambios habilitados.

Tabla de metadatos de flujos de cambios

Cuando transmites cambios con Dataflow, El conector de Bigtable Beam crea una tabla de metadatos con el nombre __change_stream_md_table de forma predeterminada. La tabla de metadatos del flujo de cambios administra el estado operativo del conector y almacena los metadatos sobre los cambios en los datos registros.

De forma predeterminada, el conector crea la tabla en la misma instancia que la tabla que se está transmitiendo. Para garantizar que la tabla funcione correctamente, el perfil de la app de la de metadatos debe usar enrutamiento de un solo clúster y tener una sola fila transacciones habilitadas.

Para obtener más información sobre la transmisión de cambios desde Bigtable con el conector de Bigtable Beam, consulta la BigtableIO documentación.

Roles obligatorios

A fin de obtener los permisos que necesitas para leer un cambio de Bigtable usando Dataflow, pídele a tu administrador que te otorgue el permiso los siguientes roles de IAM.

Para leer los cambios de Bigtable, necesitas este rol:

  • Administrador de Bigtable (roles/bigtable.admin) en la instancia de Bigtable que contiene la tabla que planeas usar transmitir cambios de

Para ejecutar el trabajo de Dataflow, necesitas las siguientes funciones:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Agrega el conector de Bigtable Beam como una dependencia

Agrega un código similar al siguiente dependencia a tu archivo pom.xml de Maven. El debe ser 2.48.0 o posterior.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Cómo leer el flujo de cambios

Para crear una canalización de Dataflow que lea tus registros de cambios de datos, sigue estos pasos: configura el conector y, luego, agrega transformaciones y receptores. Luego, usarás para leer objetos ChangeStreamMutation en una canalización de Beam.

Las muestras de código de esta sección, escritas en Java, demuestran cómo compilar una y usarla para convertir pares clave-valor en una cadena. Cada par consiste de una clave de fila y un objeto ChangeStreamMutation. La canalización convierte cada las entradas de un objeto a una cadena separada por comas.

Compila la canalización

Esta muestra de código de Java demuestra cómo compilar la canalización:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Procesa los registros de cambios de datos

En este ejemplo, se indica cómo realizar un bucle a lo largo de todas las entradas en un registro de cambios de datos para una fila y llamar a un método de conversión de cadena según el tipo de entrada.

Para obtener una lista de los tipos de entrada que puede contener un registro de cambios en los datos, consulta Qué hay en un registro de cambios de datos

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

En este ejemplo, se convierte una entrada write:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

En esta muestra, se convierte una entrada de eliminación de celdas:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

En este ejemplo, se convierte una entrada de eliminación de una familia de columnas:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Supervisar

Los siguientes recursos de la consola de Google Cloud te permiten supervisar tus recursos de Google Cloud mientras ejecutas una canalización de Dataflow leer un flujo de cambios de Bigtable:

En particular, verifica las siguientes métricas:

  • En la página Monitoring de Bigtable, verifica lo siguiente: métricas:
      .
    • Datos de Uso de CPU por flujos de cambios en la métrica cpu_load_by_app_profile_by_method_by_table Muestra el flujo de cambios en el uso de CPU de tu clúster.
    • Uso del almacenamiento del flujo de cambios (bytes) (change_stream_log_used_bytes).
  • En la página de supervisión de Dataflow, marca data actualidad, que muestra la diferencia entre la hora actual y la una marca de agua. Debería durar alrededor de dos minutos, con aumentos ocasionales que son uno o dos minutos más. Si la métrica de actualidad de los datos es siempre más alta que ese umbral, es probable que tu canalización tenga pocos recursos y agregar más trabajadores de Dataflow. La actualidad de los datos no indican si los registros de cambios en los datos se procesan lentamente.
  • El processing_delay_from_commit_timestamp_MEAN de Dataflow puede indicarte el tiempo de procesamiento promedio de los registros de cambios de datos en el la duración del trabajo.

La métrica server/latencies de Bigtable no es útil cuando se necesita supervisar una canalización de Dataflow que lee un flujo de cambios de Bigtable, ya que refleja la solicitud de transmisión y no la latencia de procesamiento del registro de cambios en los datos. Latencia alta en un no significa que las solicitudes se procesen lentamente; significa la conexión estuvo abierta tanto tiempo.

¿Qué sigue?