Crear conexiones de flujos de cambios con Dataflow

En esta página se muestra cómo crear canalizaciones de Dataflow que consuman y reenvíen datos de cambios de Spanner mediante flujos de cambios. Puedes usar el código de ejemplo de esta página para crear canalizaciones personalizadas.

Conceptos básicos

A continuación, se indican algunos conceptos básicos de las canalizaciones de Dataflow para los flujos de cambios.

Dataflow

Dataflow es un servicio rápido, rentable y sin servidor que admite el procesamiento por lotes y en streaming. Ofrece portabilidad con las tareas de procesamiento escritas con las bibliotecas de código abierto Apache Beam y automatiza el aprovisionamiento de la infraestructura y la gestión de clústeres. Dataflow proporciona streaming casi en tiempo real al leer de flujos de cambios.

Puedes usar Dataflow para consumir flujos de cambios de Spanner con el conector SpannerIO, que ofrece una abstracción sobre la API de Spanner para consultar flujos de cambios. Con este conector, no tienes que gestionar el ciclo de vida de las particiones de los flujos de cambios, lo que es necesario cuando usas la API de Spanner directamente. El conector te proporciona un flujo de registros de cambios de datos para que puedas centrarte más en la lógica de la aplicación y menos en los detalles específicos de la API y en la partición dinámica del flujo de cambios. En la mayoría de los casos en los que necesite leer datos de un flujo de cambios, le recomendamos que utilice el conector SpannerIO en lugar de la API Spanner.

Las plantillas de Dataflow son flujos de procesamiento de Dataflow prediseñados que implementan casos prácticos habituales. Consulta la sección Plantillas de Dataflow para obtener una descripción general.

Flujo de procesamiento de Dataflow

Una canalización de Dataflow de flujos de cambios de Spanner se compone de cuatro partes principales:

  1. Una base de datos de Spanner con un flujo de cambios
  2. Conector SpannerIO
  3. Transformaciones y receptores definidos por el usuario
  4. Un writer de entrada/salida de sumidero de Apache Beam

imagen

Flujo de cambios de Spanner

Para obtener información sobre cómo crear un flujo de cambios, consulta Crear un flujo de cambios.

Conector SpannerIO de Apache Beam

Este es el conector SpannerIO descrito en la sección anterior de Dataflow. Es un conector de entrada/salida de origen que emite un PCollection de registros de cambios de datos a fases posteriores de la canalización. La hora del evento de cada registro de cambio de datos emitido será la marca de tiempo de la confirmación. Ten en cuenta que los registros emitidos no están ordenados y que el conector SpannerIO garantiza que no habrá registros tardíos.

Cuando se trabaja con flujos de cambios, Dataflow usa el checkpointing. Por lo tanto, cada trabajador puede esperar hasta el intervalo de punto de control configurado para almacenar en búfer los cambios antes de enviarlos para su posterior procesamiento.

Transformaciones definidas por el usuario

Una transformación definida por el usuario permite a los usuarios agregar, transformar o modificar datos de procesamiento en una canalización de Dataflow. Los casos de uso habituales son la retirada de información personal identificable, el cumplimiento de los requisitos de formato de datos posteriores y la ordenación. Consulta la documentación oficial de Apache Beam para ver la guía de programación sobre transformaciones.

Writer de E/de sink de Apache Beam

Apache Beam contiene conectores de E/S integrados que se pueden usar para escribir desde una canalización de Dataflow en un receptor de datos como BigQuery. La mayoría de los receptores de datos más habituales se admiten de forma nativa.

Plantillas de Dataflow

Las plantillas de Dataflow proporcionan un método para crear tareas de Dataflow basadas en imágenes de Docker prediseñadas para casos prácticos habituales mediante la consola Google Cloud , la CLI Google Cloud o llamadas a la API REST.

En el caso de los flujos de cambios de Spanner, ofrecemos tres plantillas flexibles de Dataflow:

Se aplican las siguientes restricciones cuando se usa la plantilla Spanner change streams to Pub/Sub:

Definir permisos de gestión de identidades y accesos para plantillas de Dataflow

Antes de crear una tarea de Dataflow con las tres plantillas flexibles que se indican, asegúrate de que tienes los permisos de gestión de identidades y accesos necesarios para las siguientes cuentas de servicio:

Si no tienes los permisos de gestión de identidades y accesos necesarios, debes especificar una cuenta de servicio de trabajador gestionada por el usuario para crear el trabajo de Dataflow. Para obtener más información, consulta Seguridad y permisos de los flujos de datos.

Si intentas ejecutar una tarea desde una plantilla flexible de Dataflow sin todos los permisos necesarios, es posible que la tarea falle y se muestre un error al leer el archivo de resultados o un error de permiso denegado en el recurso. Para obtener más información, consulta el artículo Solucionar problemas de plantillas flexibles.

Crear una canalización de Dataflow

En esta sección se explica la configuración inicial del conector y se proporcionan ejemplos de integraciones habituales con la función de flujos de cambios de Spanner.

Para seguir estos pasos, necesitas un entorno de desarrollo de Java para Dataflow. Para obtener más información, consulta el artículo sobre cómo crear un flujo de procesamiento de Dataflow con Java.

Crear un flujo de cambios

Para obtener información sobre cómo crear un flujo de cambios, consulta Crear un flujo de cambios. Para continuar con los pasos siguientes, debe tener una base de datos de Spanner con un flujo de cambios configurado.

Conceder privilegios de control de acceso pormenorizado

Si quieres que los usuarios con control de acceso granular ejecuten el trabajo de Dataflow, asegúrate de que tengan acceso a un rol de base de datos que tenga el privilegio SELECT en el flujo de cambios y el privilegio EXECUTE en la función con valor de tabla del flujo de cambios. Asegúrate también de que la entidad de seguridad especifica el rol de base de datos en la configuración de SpannerIO o en la plantilla flexible de Dataflow.

Para obtener más información, consulta Información sobre el control de acceso pormenorizado.

Añade el conector SpannerIO como dependencia

El conector SpannerIO de Apache Beam encapsula la complejidad de consumir los flujos de cambios directamente mediante la API de Cloud Spanner, lo que emite una PCollection de registros de datos de flujo de cambios a fases posteriores de la canalización.

Estos objetos se pueden usar en otras fases de la canalización Dataflow del usuario. La integración de flujos de cambios forma parte del conector SpannerIO. Para poder usar el conector SpannerIO, debes añadir la dependencia al archivo pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Crear una base de datos de metadatos

El conector debe hacer un seguimiento de cada partición al ejecutar la canalización de Apache Beam. Estos metadatos se almacenan en una tabla de Spanner que crea el conector durante la inicialización. Cuando configures el conector, especifica la base de datos en la que se creará esta tabla.

Como se describe en las prácticas recomendadas para los flujos de cambios, te recomendamos que crees una base de datos para este fin en lugar de permitir que el conector use la base de datos de tu aplicación para almacenar su tabla de metadatos.

El propietario de un trabajo de Dataflow que utilice el conector SpannerIO debe tener los siguientes permisos de gestión de identidades y accesos definidos en esta base de datos de metadatos:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configurar el conector

El conector de flujos de cambios de Spanner se puede configurar de las siguientes formas:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

A continuación se describen las opciones de readChangeStream():

Configuración de Spanner (obligatorio)

Se usa para configurar el proyecto, la instancia y la base de datos en los que se creó el flujo de cambios y desde los que se debe consultar. También especifica de forma opcional el rol de base de datos que se debe usar cuando la entidad de gestión de identidades y accesos que ejecuta el trabajo de Dataflow es un usuario con control de acceso detallado. El trabajo asume este rol de base de datos para acceder al flujo de cambios. Para obtener más información, consulta Información sobre el control de acceso pormenorizado.

Nombre del flujo de cambios (obligatorio)

Este nombre identifica de forma exclusiva el flujo de cambios. El nombre debe ser el mismo que el que se usó al crearla.

ID de instancia de metadatos (opcional)

Es la instancia en la que se almacenan los metadatos que usa el conector para controlar el consumo de los datos de la API de flujo de cambios.

ID de la base de datos de metadatos (obligatorio)

Esta es la base de datos en la que se almacenan los metadatos que usa el conector para controlar el consumo de los datos de la API de flujo de cambios.

Nombre de la tabla de metadatos (opcional)

Solo debe usarse cuando se actualiza una canalización.

Este es el nombre de la tabla de metadatos preexistente que debe usar el conector. El conector la usa para almacenar los metadatos y controlar el consumo de los datos de la API Change Stream. Si se omite esta opción, Spanner crea una tabla con un nombre generado al inicializar el conector.

Prioridad de RPC (opcional)

La prioridad de la solicitud que se va a usar en las consultas de flujo de cambios. Si se omite este parámetro, se usará high priority.

InclusiveStartAt (obligatorio)

Los cambios realizados a partir de la marca de tiempo especificada se devuelven a la persona que llama.

InclusiveEndAt (opcional)

Se devuelven al llamador los cambios realizados hasta la marca de tiempo indicada. Si se omite este parámetro, los cambios se emitirán indefinidamente.

Añadir transformaciones y receptores para procesar datos de cambios

Una vez completados los pasos anteriores, el conector SpannerIO configurado estará listo para emitir una PCollection de objetos DataChangeRecord. Consulta Ejemplos de transformaciones y receptores para ver varias configuraciones de canalización de ejemplo que procesan estos datos de streaming de varias formas.

Ten en cuenta que los registros de flujo de cambios emitidos por el conector SpannerIO no están ordenados. Esto se debe a que las PCollections no ofrecen ninguna garantía de orden. Si necesitas una secuencia ordenada, debes agrupar y ordenar los registros como transformaciones en tus canalizaciones. Consulta el ejemplo: ordenar por clave. Puedes ampliar este ejemplo para ordenar los registros en función de cualquier campo de los registros, como los IDs de transacción.

Ejemplos de transformaciones y sumideros

Puedes definir tus propias transformaciones y especificar los receptores en los que se escribirán los datos. La documentación de Apache Beam proporciona una gran variedad de transformaciones que se pueden aplicar, así como conectores de E/S listos para usar que permiten escribir los datos en sistemas externos.

Ejemplo: ordenar por clave

Este código de ejemplo emite registros de cambios de datos ordenados por la marca de tiempo de confirmación y agrupados por claves principales mediante el conector de Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Este ejemplo de código usa estados y temporizadores para almacenar en búfer los registros de cada clave y define el tiempo de vencimiento del temporizador en un tiempo T configurado por el usuario en el futuro (definido en la función BufferKeyUntilOutputTimestamp). Cuando la marca de agua de Dataflow supera el tiempo T, este código vacía todos los registros del búfer con una marca de tiempo inferior a T, ordena estos registros por marca de tiempo de confirmación y genera un par clave-valor donde:

  • La clave es la clave de entrada, es decir, la clave principal cifrada en un array de cubos de tamaño 1000.
  • El valor son los registros de cambios de datos ordenados que se han almacenado en el búfer de la clave.

En cada clave, tenemos las siguientes garantías:

  • Los temporizadores se activan en el orden de la marca de tiempo de caducidad.
  • Se garantiza que las fases posteriores recibirán los elementos en el mismo orden en el que se produjeron.

Por ejemplo, con una clave de valor 100, el temporizador se activa a las T1 y a las T10 respectivamente, lo que genera un paquete de registros de cambios de datos en cada marca de tiempo. Como los registros de cambios de datos generados en T1 se produjeron antes que los registros de cambios de datos generados en T10, también se garantiza que la siguiente fase recibirá los registros de cambios de datos generados en T1 antes que los registros de cambios de datos generados en T10. Este mecanismo nos ayuda a garantizar un orden estricto de las marcas de tiempo de confirmación por clave principal para el procesamiento posterior.

Este proceso se repetirá hasta que finalice la canalización y se hayan procesado todos los registros de cambios de datos (o se repetirá indefinidamente si no se especifica ninguna hora de finalización).

Ten en cuenta que este código de ejemplo usa estados y temporizadores en lugar de ventanas para ordenar por clave. El motivo es que no se garantiza que las ventanas se procesen en orden. Esto significa que las ventanas más antiguas se pueden procesar más tarde que las más recientes, lo que podría provocar que el procesamiento no se realice en el orden correcto.

BreakRecordByModFn

Cada registro de cambio de datos puede contener varias modificaciones. Cada modificación representa una inserción, una actualización o una eliminación de un único valor de clave principal. Esta función desglosa cada registro de cambios de datos en registros independientes, uno por modificación.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Esta función toma un DataChangeRecord y genera un DataChangeRecord con la clave principal de Spanner cifrada en un valor entero.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

Los temporizadores y los búferes son por clave. Esta función almacena en búfer cada registro de cambio de datos hasta que la marca de agua supera la marca de tiempo en la que queremos generar los registros de cambio de datos almacenados en búfer.

Este código utiliza un temporizador de bucle para determinar cuándo vaciar el búfer:

  1. Cuando ve por primera vez un registro de cambio de datos de una clave, define el temporizador para que se active en la marca de tiempo de confirmación del registro de cambio de datos + incrementIntervalSeconds (una opción que puede configurar el usuario).
  2. Cuando se activa el temporizador, añade a recordsToOutput todos los registros de cambios de datos del búfer cuya marca de tiempo sea anterior a la hora de vencimiento del temporizador. Si el búfer tiene registros de cambios de datos cuya marca de tiempo es mayor o igual que la hora de vencimiento del temporizador, añade esos registros de cambios de datos al búfer en lugar de generarlos. A continuación, se establece el siguiente temporizador con la hora de finalización del temporizador actual más incrementIntervalInSeconds.
  3. Si recordsToOutput no está vacío, la función ordena los registros de cambios de datos de recordsToOutput por marca de tiempo de confirmación e ID de transacción y, a continuación, los muestra.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Ordenar transacciones

Esta canalización se puede cambiar para que se ordene por ID de transacción y por marca de tiempo de confirmación. Para ello, almacena en búfer los registros de cada par de ID de transacción y marca de tiempo de confirmación, en lugar de cada clave de Spanner. Para ello, debes modificar el código en KeyByIdFn.

Ejemplo: ensamblar transacciones

Este código de ejemplo lee los registros de cambios de datos, agrupa todos los registros de cambios de datos que pertenecen a la misma transacción en un solo elemento y genera ese elemento. Ten en cuenta que las transacciones que genera este código de ejemplo no están ordenadas por la marca de tiempo de confirmación.

Este código de ejemplo usa búferes para ensamblar transacciones a partir de registros de cambios de datos. Al recibir por primera vez un registro de cambio de datos perteneciente a una transacción, lee el campo numberOfRecordsInTransaction del registro de cambio de datos, que describe el número esperado de registros de cambio de datos pertenecientes a esa transacción. Almacena en búfer los registros de cambios de datos pertenecientes a esa transacción hasta que el número de registros almacenados en búfer coincida con numberOfRecordsInTransaction, momento en el que genera los registros de cambios de datos agrupados.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Esta función recibe un DataChangeRecord y devuelve un DataChangeRecord con el ID de transacción como clave.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

Los búferes de TransactionBoundaryFn reciben pares clave-valor de {TransactionId, DataChangeRecord} de KeyByTransactionIdFn y los almacenan en grupos según TransactionId. Cuando el número de registros almacenados en el búfer es igual al número de registros que contiene toda la transacción, esta función ordena los objetos DataChangeRecord del grupo por secuencia de registros y genera un par clave-valor de {CommitTimestamp, TransactionId} y Iterable<DataChangeRecord>.

Aquí, suponemos que SortKey es una clase definida por el usuario que representa un par {CommitTimestamp, TransactionId}. Para obtener más información sobre SortKey, consulta la implementación de ejemplo.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Ejemplo: filtrar por etiqueta de transacción

Cuando se etiqueta una transacción que modifica los datos de usuario, la etiqueta correspondiente y su tipo se almacenan como parte de DataChangeRecord. En estos ejemplos se muestra cómo filtrar registros de flujo de cambios en función de etiquetas de transacción definidas por el usuario y etiquetas del sistema:

Filtrado de etiquetas definidas por el usuario para my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtrado de etiquetas del sistema o auditoría de TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Ejemplo: obtener una fila completa

Este ejemplo funciona con una tabla de Spanner llamada Singer que tiene la siguiente definición:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

En el modo de captura de valor predeterminado OLD_AND_NEW_VALUES de los flujos de cambios, cuando se actualiza una fila de Spanner, el registro de cambio de datos recibido solo contendrá las columnas que se hayan modificado. Las columnas de las que se ha hecho un seguimiento, pero que no han cambiado, no se incluirán en el registro. La clave principal de mod se puede usar para hacer una lectura de la instantánea de Spanner en la marca de tiempo de confirmación del registro de cambios de datos para obtener las columnas sin cambios o incluso recuperar toda la fila.

Ten en cuenta que es posible que tengas que cambiar la política de conservación de la base de datos a un valor mayor o igual que la política de conservación del flujo de cambios para que la lectura de la instantánea se realice correctamente.

También debes tener en cuenta que usar el tipo de captura de valor NEW_ROW es la forma recomendada y más eficiente de hacerlo, ya que devuelve todas las columnas de la fila de forma predeterminada y no requiere una lectura de instantánea adicional en Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Esta transformación realizará una lectura obsoleta en la marca de tiempo de confirmación de cada registro recibido y asignará la fila completa a JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Este código crea un cliente de base de datos de Spanner para realizar la obtención de filas completa y configura el grupo de sesiones para que tenga solo unas pocas sesiones, realizando lecturas en una instancia de ToFullReowJsonFn de forma secuencial. Dataflow se asegura de generar muchas instancias de esta función, cada una con su propio grupo de clientes.

Ejemplo: Spanner a Pub/Sub

En este caso, la persona que llama transmite registros a Pub/Sub lo más rápido posible, sin agruparlos ni agregarlos. Es una buena opción para activar el procesamiento posterior, ya que transmite todas las filas nuevas insertadas en una tabla de Spanner a Pub/Sub para su posterior procesamiento.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Ten en cuenta que el receptor de Pub/Sub se puede configurar para asegurar la semántica exactly-once.

Ejemplo: Spanner a Cloud Storage

En este caso, la persona que llama agrupa todos los registros en una ventana determinada y guarda el grupo en archivos de Cloud Storage independientes. Esta opción es adecuada para las analíticas y el archivo de un momento concreto, que es independiente del periodo de conservación de Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Ten en cuenta que el receptor de Cloud Storage proporciona la semántica de al menos una vez de forma predeterminada. Con un procesamiento adicional, se puede modificar para que tenga una semántica de procesamiento una sola vez.

También proporcionamos una plantilla de Dataflow para este caso práctico. Consulta Conectar flujos de cambios a Cloud Storage.

Ejemplo: de Spanner a BigQuery (tabla de registro)

En este caso, la persona que llama transmite registros de cambios a BigQuery. Cada registro de cambio de datos se refleja como una fila en BigQuery. Es una buena opción para las analíticas. Este código usa las funciones definidas anteriormente en la sección Obtener fila completa para recuperar la fila completa del registro y escribirla en BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Ten en cuenta que el receptor de BigQuery proporciona la semántica de al menos una vez de forma predeterminada. Con un procesamiento adicional, se puede modificar para que tenga una semántica de procesamiento una sola vez.

También proporcionamos una plantilla de Dataflow para este caso práctico. Consulta Conectar flujos de cambios a BigQuery.

Monitorizar una canalización

Hay dos clases de métricas disponibles para monitorizar una canalización de Dataflow de flujo de cambios.

Métricas estándar de Dataflow

Dataflow proporciona varias métricas para asegurarse de que su trabajo esté en buen estado, como la actualización de datos, el retraso del sistema, el rendimiento del trabajo, la utilización de la CPU de los trabajadores y más. Para obtener más información, consulta el artículo Usar Monitoring en las canalizaciones de Dataflow.

En el caso de las canalizaciones de flujo de cambios, hay dos métricas principales que se deben tener en cuenta: la latencia del sistema y la actualización de los datos.

La latencia del sistema indica la duración máxima actual (en segundos) durante la cual se procesa un elemento de datos o espera a procesarse.

La actualización de los datos te mostrará el tiempo transcurrido entre el momento actual (en tiempo real) y la marca de agua de salida. La marca de agua de salida del tiempo T indica que se han procesado todos los elementos con un tiempo de evento (estrictamente) anterior a T para el cálculo. Es decir, la actualización de los datos mide el nivel de actualización de la canalización en relación con el procesamiento de los eventos que ha recibido.

Si la canalización no tiene suficientes recursos, puede ver el efecto en estas dos métricas. La latencia del sistema aumentará, ya que los elementos tendrán que esperar más tiempo antes de procesarse. La actualización de los datos también aumentará, ya que la canalización no podrá seguir el ritmo de la cantidad de datos recibidos.

Métricas de flujo de cambios personalizadas

Estas métricas se exponen en Cloud Monitoring e incluyen lo siguiente:

  • Latencia agrupada (histograma) entre el momento en que se confirma un registro en Spanner y el momento en que el conector lo emite en una PCollection. Esta métrica se puede usar para detectar problemas de rendimiento (latencia) en la canalización.
  • Número total de registros de datos leídos. Es una indicación general del número de registros emitidos por el conector. Este número debería aumentar constantemente, reflejando la tendencia de las escrituras en la base de datos de Spanner subyacente.
  • Número de particiones que se están leyendo. Siempre debe haber particiones que se estén leyendo. Si este número es cero, significa que se ha producido un error en la canalización.
  • Número total de consultas emitidas durante la ejecución del conector. Es una indicación general de las consultas de flujo de cambios realizadas en la instancia de Spanner durante la ejecución de la canalización. Se puede usar para obtener una estimación de la carga del conector en la base de datos de Spanner.

Actualizar una canalización

Es posible actualizar una canalización en ejecución que use el conector SpannerIO para procesar flujos de cambios si se superan las comprobaciones de compatibilidad de trabajos. Para ello, debes definir explícitamente el parámetro del nombre de la tabla de metadatos de la nueva tarea al actualizarla. Usa el valor de la opción metadataTable pipeline del trabajo que estés actualizando.

Si usas una plantilla de Dataflow proporcionada por Google, define el nombre de la tabla con el parámetro spannerMetadataTableName. También puedes modificar el trabajo para usar explícitamente la tabla de metadatos con el método withMetadataTable(your-metadata-table-name) en la configuración del conector. Una vez que haya completado este paso, puede seguir las instrucciones de la sección Lanzar el trabajo de sustitución de la documentación de Dataflow para actualizar un trabajo en ejecución.

Prácticas recomendadas para los flujos de cambios y Dataflow

A continuación, se indican algunas prácticas recomendadas para crear conexiones de flujos de cambios mediante Dataflow.

Usar una base de datos de metadatos independiente

Le recomendamos que cree una base de datos independiente para que el conector SpannerIO la use para el almacenamiento de metadatos, en lugar de configurarla para que use la base de datos de su aplicación.

Para obtener más información, consulta el artículo Considerar una base de datos de metadatos independiente.

Determinar el tamaño del clúster

Una regla general para el número inicial de trabajadores en un trabajo de secuencias de cambios de Spanner es un trabajador por cada 1000 escrituras por segundo. Ten en cuenta que esta estimación puede variar en función de varios factores, como el tamaño de cada transacción, el número de registros de flujo de cambios que se producen a partir de una sola transacción y otras transformaciones, agregaciones o receptores que se utilicen en la canalización.

Después de asignar los recursos iniciales, es importante hacer un seguimiento de las métricas mencionadas en Monitorizar una canalización para asegurarse de que la canalización esté en buen estado. Te recomendamos que pruebes con un tamaño inicial del grupo de trabajadores y monitorices cómo gestiona tu canalización la carga, aumentando el número de nodos si es necesario. La utilización de la CPU es una métrica clave para comprobar si la carga es adecuada y si se necesitan más nodos.

Limitaciones conocidas

Hay algunas limitaciones conocidas al usar flujos de cambios de Spanner con Dataflow:

Autoescalado

Para poder usar el autoescalado en las canalizaciones que incluyan SpannerIO.readChangeStream se necesita Apache Beam 2.39.0 o una versión posterior.

Si usas una versión de Apache Beam anterior a 2.39.0, los flujos de procesamiento que incluyan SpannerIO.readChangeStream deben especificar explícitamente el algoritmo de autoescalado como NONE, tal como se describe en Autoescalado horizontal.

Para escalar manualmente un flujo de procesamiento de Dataflow en lugar de usar el autoescalado, consulta Escalar manualmente un flujo de procesamiento en tiempo real.

Runner V2

El conector de flujos de cambios de Spanner requiere Dataflow Runner V2. Esto debe especificarse manualmente durante la ejecución o se producirá un error. Puedes especificar Runner V2 configurando tu trabajo con --experiments=use_unified_worker,use_runner_v2.

Captura

El conector de secuencias de cambios de Spanner no admite snapshots de Dataflow.

Purgándose

El conector de flujos de cambios de Spanner no admite finalizar un trabajo. Solo se puede cancelar un trabajo que ya se haya creado.

También puedes actualizar un flujo de procesamiento sin necesidad de detenerlo.

OpenCensus

Para usar OpenCensus y monitorizar tu canalización, especifica la versión 0.28.3 o una posterior.

NullPointerException al iniciar el flujo de procesamiento

Un error en la versión 2.38.0 de Apache Beam puede provocar un NullPointerException al iniciar la canalización en determinadas condiciones. De esta forma, el trabajo no se iniciará y se mostrará este mensaje de error:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Para solucionar este problema, use la versión 2.39.0 de Apache Beam o una posterior, o bien especifique manualmente la versión de beam-sdks-java-core como 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Más información