Crear conexiones de flujos de cambios con Dataflow

En esta página, se muestra cómo crear canalizaciones de Dataflow que consumen y reenvían datos de cambio de Spanner mediante flujos de cambios. Puedes usar el código de ejemplo de esta página para compilar canalizaciones personalizadas.

Conceptos básicos

A continuación, se incluyen algunos conceptos principales de las canalizaciones de Dataflow para flujos de cambios.

Dataflow

Dataflow es un servicio sin servidores, rápido y rentable que admite el procesamiento de transmisión y por lotes. Proporciona portabilidad con trabajos de procesamiento escritos con las bibliotecas de Apache Beam de código abierto y automatiza el aprovisionamiento de infraestructura y la administración de clústeres. Dataflow proporciona transmisión casi en tiempo real, con aproximadamente seis segundos de latencia cuando se lee desde flujos de cambios.

Puedes usar Dataflow para consumir los flujos de cambios de Spanner con el conector Spanner, que ofrece una abstracción de la API de Spanner para consultar flujos de cambios. Con este conector, no tienes que administrar el ciclo de vida de la partición de los flujos de cambios, lo cual es necesario cuando usas la API de Spanner directamente. El conector te proporciona una transmisión de registros de cambios de datos para que puedas enfocarte más en la lógica de la aplicación y menos en los detalles específicos de la API y la partición dinámica de flujos de cambios. Recomendamos usar el conector de SpannerIO en lugar de la API de Spanner en la mayoría de los casos en los que necesites leer datos de transmisión de cambios.

Las plantillas de Dataflow son canalizaciones de Dataflow compiladas que implementan casos de uso comunes. Consulta Plantillas de Dataflow para obtener una descripción general.

Canalización de Dataflow

Una canalización de Dataflow con flujos de cambios de Spanner se compone de cuatro partes:

  1. Una base de datos de Spanner con un flujo de cambios
  2. El conector de SpannerIO
  3. Transformaciones y receptores definidos por el usuario
  4. Escritor de E/S para receptores

imagen

Cada una de ellas se analiza con más detalle a continuación.

Flujo de cambios de Spanner

Para obtener detalles sobre cómo crear un flujo de cambios, consulta Cómo crear un flujo de cambios.

Conector de SpannerIO de Apache Beam

Este es el conector de SpannerIO que se describió antes. Es un conector de E/S de origen que emite un PCollection de registros de cambios de datos a las etapas posteriores de la canalización. El tiempo del evento para cada registro de cambio de datos emitido será la marca de tiempo de confirmación. Ten en cuenta que los registros emitidos están desordenados y que el conector SpannerIO garantiza que no habrá registros tardíos.

Cuando se trabaja con flujos de cambios, Dataflow usa puntos de control. Como resultado, cada trabajador podría esperar hasta cinco segundos mientras se almacena en búfer los cambios antes de enviar los cambios para su procesamiento posterior. Se espera una latencia de aproximadamente seis segundos.

Transformaciones definidas por el usuario

Una transformación definida por el usuario le permite a este agregar, transformar o modificar datos de procesamiento dentro de una canalización de Dataflow. Los casos de uso comunes para esto son la eliminación de información de identificación personal, el cumplimiento de los requisitos de formato de datos descendentes y el ordenamiento. Consulta la documentación oficial de Apache Beam para obtener la guía de programación sobre transforms.

Escritor de E/S del receptor de Apache Beam

Apache Beam contiene transformaciones de E/S integradas que se pueden usar para escribir desde una canalización de Dataflow en un receptor de datos como BigQuery. La mayoría de los receptores de datos son compatibles de forma nativa.

Plantillas de Dataflow

Las plantillas de Dataflow proporcionan una manera fácil de crear trabajos de Dataflow basados en imágenes de Docker compiladas previamente para casos prácticos comunes mediante la consola de Google Cloud, la CLI de Google Cloud o las llamadas a la API de REST.

Para los flujos de cambios de Spanner, proporcionamos tres plantillas de Dataflow Flex:

Crea una canalización de Dataflow

En esta sección, se describe la configuración inicial del conector y se proporcionan muestras de integraciones comunes con la funcionalidad de flujos de cambios de Spanner.

Para seguir estos pasos, necesitas un entorno de desarrollo de Java para Dataflow. Para obtener más información, consulta Crea una canalización de Dataflow con Java.

Crear transmisión de cambios

Si deseas obtener detalles para crear un flujo de cambios, consulta Cómo crear un flujo de cambios. Para continuar con los próximos pasos, debes tener una base de datos de Spanner con un flujo de cambios configurado.

Otorga privilegios de control de acceso detallados

Si esperas que algún usuario de control de acceso detallado ejecute el trabajo de Dataflow, asegúrate de que los usuarios tengan acceso a una función de base de datos que tenga el privilegio SELECT en el flujo de cambios y el privilegio EXECUTE en la función con valor de tabla del flujo de cambios. Además, asegúrate de que la principal especifique el rol de base de datos en la configuración de SpannerIO o en la plantilla flexible de Dataflow.

Para obtener más información, consulta Acerca del control de acceso detallado.

Agrega el conector de SpannerIO como dependencia

El conector SpannerIO de Apache Beam encapsula la complejidad de consumir los flujos de cambios directamente a través de la API de Cloud Spanner y emite una PCollection de registros de datos de flujos de cambios para etapas posteriores de la canalización.

Estos objetos se pueden consumir en otras etapas de la canalización de Dataflow del usuario. La integración de flujos de cambios forma parte del conector de SpannerIO. Para poder usar el conector de SpannerIO, debes agregar la dependencia al archivo pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Crea una base de datos de metadatos

El conector debe realizar un seguimiento de cada partición cuando ejecuta la canalización de Apache Beam. Mantiene estos metadatos en una tabla de Spanner que crea el conector durante la inicialización. Especifica la base de datos en la que se creará esta tabla cuando configures el conector.

Como se describe en Prácticas recomendadas sobre los flujos de cambios, recomendamos crear una base de datos nueva para este propósito, en lugar de permitir que el conector use la base de datos de tu aplicación para almacenar su tabla de metadatos.

El propietario de un trabajo de Dataflow que usa el conector SpannerIO debe tener los siguientes permisos de IAM configurados con esta base de datos de metadatos:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configura el conector

El conector de flujos de cambios de Spanner se puede configurar de la siguiente manera:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Las siguientes son descripciones de las opciones de readChangeStream():

Configuración de Spanner (obligatorio)

Se usa para configurar el proyecto, la instancia y la base de datos donde se creó el flujo de cambios y desde donde se debe realizar consultas. De manera opcional, también especifica el rol de base de datos que se debe usar cuando la principal de IAM que ejecuta el trabajo de Dataflow es un usuario de control de acceso detallado. El trabajo asume este rol de base de datos para acceder al flujo de cambios. Para obtener más información, consulta Acerca del control de acceso detallado.

Nombre del flujo de cambios (obligatorio)

Este nombre identifica de forma única el flujo de cambios. Este nombre debe ser el mismo que se usó cuando se creó.

ID de la instancia de metadatos (opcional)

Esta es la instancia en la que se almacenarán los metadatos que usa el conector para controlar el consumo de los datos de la API del flujo de cambios.

ID de la base de datos de metadatos (obligatorio)

Esta es la base de datos en la que se almacenarán los metadatos que usa el conector para controlar el consumo de los datos de la API del flujo de cambios.

Nombre de la tabla de metadatos (opcional)

Solo debe usarse cuando se actualiza una canalización existente.

Este es el nombre de la tabla de metadatos preexistente que usará el conector. El conector la usa para almacenar los metadatos y controlar el consumo de los datos de la API de flujo de cambios. Si se omite esta opción, Spanner crea una tabla nueva con un nombre generado cuando se inicializa el conector.

Prioridad de RPC (opcional)

La prioridad de solicitud que se usará para las consultas de flujos de cambios. Si se omite este parámetro, se usará high priority.

InclusiveStartAt (obligatorio)

Los cambios de la marca de tiempo dada se muestran al llamador.

InclusiveEndAt (opcional)

Los cambios realizados hasta la marca de tiempo determinada se muestran al llamador. Si se omite este parámetro, los cambios se emitirán de forma indefinida.

Agrega transformaciones y receptores para procesar datos de cambios

Una vez completados los pasos anteriores, el conector de SpannerIO configurado está listo para emitir una PCollection de objetos DataChangeRecord. Consulta Ejemplos de transformaciones y receptores para ver varias opciones de configuración de canalización de muestra que procesan estos datos transmitidos de distintas maneras.

Ten en cuenta que los registros de transmisión de cambios emitidos por el conector SpannerIO no están ordenados. Esto se debe a que las PCollections no proporcionan ninguna garantía de orden. Si necesitas una transmisión ordenada, debes agrupar y ordenar los registros como transformaciones en tus canalizaciones. Consulta Muestra: Ordena por clave. Puedes ampliar esta muestra para ordenar los registros según cualquier campo de los registros, por ejemplo, por ID de transacción.

Ejemplos de transformaciones y receptores

Puedes definir tus propias transformaciones y especificar los receptores en los que deseas escribir los datos. La documentación de Apache Beam proporciona una gran cantidad de transforms que se pueden aplicar, además de conectores de E/S listas para escribir los datos en sistemas externos.

Muestra: Ordena por clave

Esta muestra de código emite registros de cambios de datos ordenados por marca de tiempo de confirmación y agrupados por claves primarias con el conector de Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

En esta muestra de código, se usan estados y temporizadores para almacenar en búfer los registros de cada clave, y se establece la hora de vencimiento del temporizador en alguna hora configurada por el usuario T en el futuro (definida en la función BufferKeyUntilOutputTimestamp). Cuando la marca de agua de Dataflow pasa el tiempo T, este código limpia todos los registros del búfer con una marca de tiempo menor que T, ordena estos registros por marca de tiempo de confirmación y genera un par clave-valor en el que se muestra lo siguiente:

  • La clave es la de entrada, que es la clave primaria con hash para un array de bucket de tamaño 1,000.
  • El valor son los registros de cambios de datos ordenados que se almacenaron en búfer para la clave.

Para cada clave, tenemos las siguientes garantías:

  • Se garantiza que los temporizadores se activarán en el orden de la marca de tiempo de vencimiento.
  • Se garantiza que las etapas posteriores recibirán los elementos en el mismo orden en que se produjeron.

Por ejemplo, supongamos que para una clave con el valor 100, el temporizador se activa en T1 y T10 respectivamente, lo que produce un conjunto de registros de cambio de datos en cada marca de tiempo. Debido a que los registros de cambio de datos que se muestran en T1 se produjeron antes que los registros de cambio de datos que se mostraron en T10, también se garantiza que los registros de cambio de datos que se muestran en T1 se recibirán en la siguiente etapa antes de que los registros de cambio de datos se muestren en T10. Este mecanismo nos ayuda a garantizar un orden estricto de marcas de tiempo de confirmación por clave primaria para el procesamiento posterior.

Este proceso se repetirá hasta que finalice la canalización y se hayan procesado todos los registros de cambios de datos (o se repetirá de forma indefinida si no se especifica una hora de finalización).

Ten en cuenta que esta muestra de código usa estados y temporizadores, en lugar de ventanas, para realizar el ordenamiento por clave. La lógica es que no se garantiza que las ventanas se procesen en orden. Esto significa que las ventanas más antiguas pueden procesarse después que las más recientes, lo que podría causar un procesamiento desordenado.

BreakRecordByModFn

Cada registro de cambios de datos puede contener varias modificaciones. Cada modificación representa una inserción, actualización o eliminación de un solo valor de clave primaria. Esta función divide cada registro de cambios de datos en registros separados, uno por modificación.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Esta función toma un DataChangeRecord y genera un DataChangeRecord vinculado por la clave primaria de Spanner a un valor de número entero.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Los temporizadores y los búferes son por clave. Esta función almacena en búfer cada registro de cambio de datos hasta que la marca de agua pasa la marca de tiempo en la que queremos generar los registros de cambio de datos almacenados en búfer.

El siguiente código utiliza un temporizador en bucle para determinar cuándo vaciar el búfer:

  1. Cuando ve un registro de cambios de datos de una clave por primera vez, establece el temporizador para que se active en la marca de tiempo de confirmación del registro de cambios de datos + incrementIntervalSeconds (una opción que puede configurar el usuario).
  2. Cuando se activa el temporizador, se agregan todos los registros de cambios de datos del búfer con una marca de tiempo menor que el tiempo de vencimiento del temporizador en recordsToOutput. Si el búfer tiene registros de cambios de datos cuya marca de tiempo es mayor o igual que el tiempo de vencimiento del temporizador, vuelve a agregar esos registros de cambios de datos al búfer en lugar de emitirlos. Luego, establece el próximo temporizador en la hora de vencimiento del temporizador actual más incrementIntervalInSeconds.
  3. Si recordsToOutput no está vacío, la función ordena los registros de cambios de datos en recordsToOutput por marca de tiempo de confirmación y el ID de transacción y, luego, los muestra.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Cómo pedir transacciones

Esta canalización se puede cambiar para ordenar por ID de transacción y marca de tiempo de confirmación. Para hacerlo, almacena los registros en búfer para cada ID de transacción / par de marca de tiempo de confirmación, en lugar de cada clave de Spanner. Esta acción requiere modificar el código de KeyByIdFn.

Muestra: Ensamblar transacciones

Esta muestra de código lee los registros de cambios de datos, ensambla todos los registros de cambios de datos que pertenecen a la misma transacción en un solo elemento y genera ese elemento. Ten en cuenta que las transacciones que genera este código de muestra no se ordenan por marca de tiempo de confirmación.

Esta muestra de código usa búferes para ensamblar transacciones a partir de registros de cambios de datos. Cuando recibes un registro de cambios de datos que pertenece a una transacción por primera vez, lee el campo numberOfRecordsInTransaction en el registro de cambios de datos, que describe la cantidad esperada de registros de cambios de datos pertenecientes a esa transacción. Almacena en búfer los registros de cambios de datos que pertenecen a esa transacción hasta que la cantidad de registros almacenados en búfer coincida con numberOfRecordsInTransaction, lo que genera los registros de cambio de datos agrupados.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Esta función toma un DataChangeRecord y da como resultado un DataChangeRecord codificado por el ID de transacción.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

Los búferes TransactionBoundaryFn recibieron pares clave-valor de {TransactionId, DataChangeRecord} de KeyByTransactionIdFn y los almacenaron en el búfer en grupos basados en TransactionId. Cuando la cantidad de registros almacenados en búfer es igual a la cantidad de registros contenidos en toda la transacción, esta función ordena los objetos DataChangeRecord del grupo por secuencia de registros y genera un par clave-valor de {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

En este caso, suponemos que SortKey es una clase definida por el usuario que representa un par {CommitTimestamp, TransactionId}. Consulta la implementación de muestra para SortKey.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Muestra: Filtra por etiqueta de transacción

Cuando se etiqueta una transacción que modifica los datos del usuario, la etiqueta correspondiente y su tipo se almacenan como parte de DataChangeRecord. Estos ejemplos demuestran cómo filtrar registros de flujos de cambios según las etiquetas de transacción definidas por el usuario y las etiquetas del sistema:

Filtrado de etiquetas definido por el usuario para my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtrado de etiquetas del sistema/auditoría de TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Muestra: Obtener fila completa

Este ejemplo funciona con una tabla de Spanner llamada Singer que tiene la siguiente definición:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

En el modo de captura de valores OLD_AND_NEW_VALUES predeterminado de los flujos de cambios, cuando haya una actualización en una fila de Spanner, el registro de cambios en los datos recibido contendrá solo las columnas que se modificaron. Las columnas con seguimiento que no se hayan modificado no se incluirán en el registro. La clave primaria de la modificación se puede usar para realizar una lectura de instantánea de Spanner en la marca de tiempo de confirmación del registro de cambios de datos para recuperar las columnas sin cambios o incluso recuperar la fila completa.

Ten en cuenta que es posible que debas cambiar la política de retención de la base de datos a un valor mayor o igual que la política de retención de flujos de cambios para que la instantánea se lea correctamente.

Además, ten en cuenta que usar el tipo de captura de valor NEW_ROW es la forma recomendada y más eficiente de hacerlo, ya que muestra todas las columnas con seguimiento de la fila de forma predeterminada y no requiere una lectura de instantánea adicional en Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Esta transformación realizará una lectura inactiva en la marca de tiempo de confirmación de cada registro recibido y asignará la fila completa a JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Este código crea un cliente de base de datos de Spanner para realizar la recuperación de filas completa y configura el grupo de sesiones para que tenga solo algunas sesiones, y realiza operaciones de lectura en una instancia de ToFullRowJsonFn de forma secuencial. Dataflow se asegura de generar muchas instancias de esta función, cada una con su propio grupo de clientes.

Muestra: Spanner a Pub/Sub

En esta situación, el emisor transmite registros a Pub/Sub lo más rápido posible, sin ninguna agrupación ni agregación. Esta es una buena opción para activar procesamientos posteriores, como transmitir todas las filas nuevas insertadas en una tabla de Spanner a Pub/Sub para su procesamiento posterior.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Ten en cuenta que el receptor de Pub/Sub se puede configurar para garantizar una semántica de exactamente una vez.

Muestra: Spanner a Cloud Storage

En esta situación, el llamador agrupa todos los registros dentro de una ventana determinada y guarda el grupo en archivos de Cloud Storage separados. Esta es una buena opción para las estadísticas y el archivo de un momento determinado, que es independiente del período de retención de Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Ten en cuenta que el receptor de Cloud Storage proporciona semántica al menos una vez de forma predeterminada. Con procesamiento adicional, se puede modificar para que tenga semántica del tipo “exactamente una vez”.

También proporcionamos una plantilla de Dataflow para este caso de uso: consulta Conecta flujos de cambios a Cloud Storage.

Muestra: Spanner a BigQuery (tabla de registro)

Aquí, el llamador transmite registros de cambio a BigQuery. Cada registro de cambios en los datos se refleja como una fila en BigQuery. Esta es una buena opción para las estadísticas. Este código usa las funciones definidas anteriormente en la sección Recuperar fila completa para recuperar la fila completa del registro y escribirla en BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Ten en cuenta que el receptor de BigQuery proporciona semántica del tipo “al menos una vez” de forma predeterminada. Con procesamiento adicional, se puede modificar para que tenga semántica del tipo “exactamente una vez”.

También proporcionamos una plantilla de Dataflow para este caso de uso. Consulta Conecta flujos de cambios a BigQuery.

Supervisa una canalización

Hay dos clases de métricas disponibles para supervisar una canalización de Dataflow de flujos de cambios.

Métricas estándar de Dataflow

Dataflow proporciona varias métricas para asegurarse de que su trabajo esté en buen estado, como la actualidad de los datos, el retraso del sistema, la capacidad de procesamiento del trabajo, el uso de CPU del trabajador y mucho más. Puedes encontrar más información en Usa Monitoring para las canalizaciones de Dataflow.

En el caso de las canalizaciones de flujos de cambios, hay dos métricas principales que deben tenerse en cuenta: la latencia del sistema y la actualización de datos.

La latencia del sistema te indicará el tiempo máximo actual (en segundos) durante el cual un elemento de datos se procesa o está en espera de procesamiento.

La actualidad de los datos te mostrará el tiempo transcurrido desde ahora (en tiempo real) hasta la marca de agua de salida. La marca de agua de salida con la hora T indica que todos los elementos con una hora de evento (estrictamente) anterior a T se procesaron para el procesamiento. En otras palabras, la actualidad de los datos mide qué tan actualizada está la canalización con respecto al procesamiento de los eventos que recibió.

Si la canalización no tiene recursos suficientes, puedes ver ese efecto en estas dos métricas. La latencia del sistema aumentará porque los elementos deben esperar más tiempo para procesarse. La actualidad de los datos también aumentará, porque la canalización no podrá mantener el ritmo de la cantidad de datos recibidos.

Métricas personalizadas del flujo de cambios

Estas métricas se exponen en Cloud Monitoring y, además, incluyen lo siguiente:

  • Latencia agrupada (histograma) entre un registro que el conector confirma en Spanner y que emite en una PCollection. Esta métrica se puede usar para ver cualquier problema de rendimiento (latencia) con la canalización.
  • Cantidad total de registros de datos leídos. Esta es una indicación general de la cantidad de registros emitidos por el conector. Esta cantidad debería aumentar de manera constante y reflejar la tendencia de escrituras en la base de datos subyacente de Spanner.
  • Cantidad de particiones que se leen actualmente. Siempre debería haber particiones en proceso de lectura. Si este número es cero, indica que se produjo un error en la canalización.
  • Cantidad total de consultas emitidas durante la ejecución del conector. Esta es una indicación general de las consultas de flujos de cambios realizadas a la instancia de Spanner durante la ejecución de la canalización. Se puede usar para obtener una estimación de la carga del conector a la base de datos de Spanner.

Actualiza una canalización existente

Es posible actualizar una canalización en ejecución que use el conector SpannerIO para procesar flujos de cambios si se aprueban las verificaciones de compatibilidad del trabajo. Para ello, debes configurar de forma explícita el parámetro del nombre de la tabla de metadatos del trabajo nuevo cuando lo actualices. Usa el valor de la opción de canalización metadataTable del trabajo que estás actualizando.

Si usas una plantilla de Dataflow proporcionada por Google, establece el nombre de la tabla con el parámetro spannerMetadataTableName. También puedes modificar tu trabajo existente para usar de forma explícita la tabla de metadatos con el método withMetadataTable(your-metadata-table-name) en la configuración del conector. Una vez hecho esto, puedes seguir las instrucciones en Inicia tu trabajo de reemplazo desde los documentos de Dataflow para actualizar un trabajo en ejecución.

Prácticas recomendadas para flujos de cambios y Dataflow

Las siguientes son algunas prácticas recomendadas para compilar conexiones de flujos de cambios mediante Dataflow.

Usa una base de datos de metadatos independiente

Recomendamos crear una base de datos separada para que el conector de SpannerIO la use como almacenamiento de metadatos, en lugar de configurarla con el objetivo de usar la base de datos de tu aplicación.

Para obtener más información, consulta Considera una base de datos de metadatos independiente.

Cómo dimensionar tu clúster

Una regla general para una cantidad inicial de trabajadores en un trabajo de flujos de cambios de Spanner es un trabajador por cada 1,000 escrituras por segundo. Ten en cuenta que esta estimación puede variar en función de varios factores, como el tamaño de cada transacción, la cantidad de registros de flujos de cambios que se producen a partir de una sola transacción y otras transformaciones, agregaciones o receptores que se usan en la canalización.

Después de adquirir los recursos iniciales, es importante hacer un seguimiento de las métricas mencionadas en Supervisa una canalización para asegurarte de que la canalización esté en buen estado. Recomendamos experimentar con un tamaño inicial del grupo de trabajadores y supervisar cómo la canalización se encarga de la carga y aumentar la cantidad de nodos si es necesario. El uso de CPU es una métrica clave para verificar si la carga es adecuada y si se necesitan más nodos.

Limitaciones conocidas

Ajuste de escala automático

La compatibilidad con el ajuste de escala automático para cualquier canalización que incluya SpannerIO.readChangeStream requiere Apache Beam 2.39.0 o superior.

Si usas una versión de Apache Beam anterior a 2.39.0, las canalizaciones que incluyen SpannerIO.readChangeStream deben especificar de forma explícita el algoritmo de ajuste de escala automático como NONE, como se describe en Ajuste de escala automático horizontal.

Para escalar de forma manual una canalización de Dataflow en lugar de usar el ajuste de escala automático, consulta Escala de forma manual una canalización de transmisión.

Runner V2

El conector de flujos de cambios de Spanner requiere Dataflow Runner V2. Debe especificarse de forma manual durante la ejecución; de lo contrario, se arrojará un error. Puedes especificar Runner V2 si configuras tu trabajo con --experiments=use_unified_worker,use_runner_v2.

Instantánea

El conector de flujos de cambios de Spanner no admite instantáneas de Dataflow.

Desviando

El conector de flujos de cambios de Spanner no admite el desvío de un trabajo. Solo se puede cancelar un trabajo existente.

También puedes actualizar una canalización existente sin necesidad de detenerla.

OpenCensus

Si deseas usar OpenCensus para supervisar tu canalización, especifica la versión 0.28.3 o una posterior.

NullPointerException cuando se inicia la canalización

Un error en la versión 2.38.0 de Apache Beam puede causar un NullPointerException cuando se inicia la canalización en ciertas condiciones. Esto evitaría que tu trabajo se inicie y, en su lugar, mostrará este mensaje de error:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Para solucionar este problema, usa la versión 2.39.0 de Apache Beam o una versión posterior, o bien especifica de forma manual la versión de beam-sdks-java-core como 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Más información