En esta página, se muestra cómo crear canalizaciones de Dataflow que consumen y reenvían datos de cambios de Spanner con flujos de cambios. Puedes usar el código de ejemplo de esta página para compilar canalizaciones personalizadas.
Conceptos básicos
A continuación, se incluyen algunos conceptos básicos de las canalizaciones de Dataflow para flujos de cambios.
Dataflow
Dataflow es un servicio sin servidores, rápido y rentable que admite el procesamiento por lotes y de transmisión. Proporciona portabilidad con trabajos de procesamiento escritos con las bibliotecas de código abierto de Apache Beam y automatiza el aprovisionamiento de infraestructura y la administración de clústeres. Dataflow proporciona una transmisión casi en tiempo real cuando se lee desde flujos de cambios.
Puedes usar Dataflow para consumir flujos de cambios de Spanner con el c conector de SpannerIO, que ofrece una abstracción sobre la API de Spanner para consultar flujos de cambios. Con este conector, no tienes que administrar el ciclo de vida de la partición de los flujos de cambios, lo que es necesario cuando usas la API de Spanner directamente. El conector te proporciona un flujo de registros de cambios de datos para que puedas enfocarte más en la lógica de la aplicación y menos en los detalles específicos de la API y la partición dinámica del flujo de cambios. Recomendamos usar el conector SpannerIO en lugar de la API de Spanner en la mayoría de las circunstancias en las que necesitas leer datos de flujos de cambios.
Las plantillas de Dataflow son canalizaciones de Dataflow precompiladas que implementan casos de uso comunes. Consulta Plantillas de Dataflow para obtener una descripción general.
Canalización de Dataflow
Una canalización de Dataflow de flujos de cambios de Spanner se compone de cuatro partes principales:
- Una base de datos de Spanner con un flujo de cambios
- El conector SpannerIO
- Transformaciones y destinos definidos por el usuario
- Un escritor de E/S de destino de Apache Beam
Flujo de cambios de Spanner
Para obtener detalles sobre cómo crear un flujo de cambios, consulta Crea un flujo de cambios.
Conector SpannerIO de Apache Beam
Este es el conector SpannerIO que se describió en la sección anterior de Dataflow.
Es un conector de E/S de origen que emite un PCollection
de registros de cambios de datos a etapas posteriores de la canalización. La hora del evento para cada registro de cambio de datos emitido será la marca de tiempo de confirmación. Ten en cuenta que los registros emitidos son no ordenados y que el conector SpannerIO garantiza que no habrá registros tardíos.
Cuando se trabaja con flujos de cambios, Dataflow usa puntos de control. Como resultado, cada trabajador puede esperar hasta el intervalo de punto de control configurado para almacenar los cambios en búfer antes de enviarlos para su procesamiento posterior.
Transformaciones definidas por el usuario
Una transformación definida por el usuario le permite al usuario agregar, transformar o modificar datos de procesamiento dentro de una canalización de Dataflow. Los casos de uso más comunes para esto son la eliminación de información de identificación personal, la satisfacción de los requisitos de formato de datos downstream y la clasificación. Consulta la documentación oficial de Apache Beam para obtener la guía de programación sobre transformaciones.
Escritor de E/S de destino de Apache Beam
Apache Beam contiene conectores de E/S integrados que se pueden usar para escribir desde una canalización de Dataflow en un receptor de datos como BigQuery. Se admiten de forma nativa los destinos de datos más comunes.
Plantillas de Dataflow
Las plantillas de Dataflow proporcionan un método para crear trabajos de Dataflow basados en imágenes de Docker precompiladas para casos de uso comunes con la consola de Google Cloud , la CLI de Google Cloud o llamadas a la API de Rest.
Para los flujos de cambios de Spanner, proporcionamos tres plantillas flexibles de Dataflow:
Configura los permisos de IAM para las plantillas de Dataflow
Antes de crear un trabajo de Dataflow con las tres plantillas flexibles que se enumeran, asegúrate de tener los permisos de IAM necesarios para las siguientes cuentas de servicio:
Si no tienes los permisos de IAM necesarios, debes especificar una cuenta de servicio de trabajador administrada por el usuario para crear el trabajo de Dataflow. Para obtener más información, consulta Seguridad y permisos de Dataflow.
Cuando intentas ejecutar un trabajo desde una plantilla flexible de Dataflow sin todos los permisos necesarios, el trabajo podría fallar con un error de no poder leer el archivo de resultados o un error de permiso denegado en el recurso. Para obtener más información, consulta Soluciona problemas de plantillas flexibles.
Crea una canalización de Dataflow
En esta sección, se describe la configuración inicial del conector y se proporcionan ejemplos de integraciones comunes con la función de flujos de cambios de Spanner.
Para seguir estos pasos, necesitas un entorno de desarrollo de Java para Dataflow. Para obtener más información, consulta Crea una canalización de Dataflow con Java.
Crear transmisión de cambios
Para obtener más información sobre cómo crear un flujo de cambios, consulta Crea un flujo de cambios. Para continuar con los siguientes pasos, debes tener una base de datos de Spanner con un flujo de cambios configurado.
Otorga privilegios de control de acceso detallado
Si esperas que algún usuario de control de acceso detallado ejecute el trabajo de Dataflow,
asegúrate de que los usuarios tengan acceso a un rol de base de datos
que tenga el privilegio SELECT
en la transmisión de cambios y el privilegio EXECUTE
en la función con valores de tabla de la transmisión de cambios. Además, asegúrate de que el principal especifique el rol de la base de datos en la configuración de SpannerIO o en la plantilla de Dataflow Flex.
Para obtener más información, consulta Información sobre el control de acceso detallado.
Agrega el conector SpannerIO como dependencia
El conector SpannerIO de Apache Beam encapsula la complejidad de consumir los flujos de cambios directamente con la API de Cloud Spanner y emite una PCollection de registros de datos de flujo de cambios a etapas posteriores de la canalización.
Estos objetos se pueden consumir en otras etapas de la canalización de Dataflow del usuario. La integración del flujo de cambios forma parte del conector SpannerIO. Para poder usar el conector SpannerIO, debes agregar la dependencia al archivo pom.xml
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>
Crea una base de datos de metadatos
El conector debe hacer un seguimiento de cada partición cuando se ejecuta la canalización de Apache Beam. Mantiene estos metadatos en una tabla de Spanner que crea el conector durante la inicialización. Cuando configures el conector, debes especificar la base de datos en la que se creará esta tabla.
Como se describe en las prácticas recomendadas de flujos de cambios, te recomendamos que crees una base de datos nueva para este fin, en lugar de permitir que el conector use la base de datos de tu aplicación para almacenar su tabla de metadatos.
El propietario de un trabajo de Dataflow que usa el conector de SpannerIO debe tener configurados los siguientes permisos de IAM con esta base de datos de metadatos:
spanner.databases.updateDdl
spanner.databases.beginReadOnlyTransaction
spanner.databases.beginOrRollbackReadWriteTransaction
spanner.databases.read
spanner.databases.select
spanner.databases.write
spanner.sessions.create
spanner.sessions.get
Configura el conector
El conector de flujos de cambios de Spanner se puede configurar de la siguiente manera:
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
startTime.getSeconds() + (10 * 60),
startTime.getNanos()
);
SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-meta-instance-id")
.withMetadataDatabase("my-meta-database-id")
.withMetadataTable("my-meta-table-name")
.withRpcPriority(RpcPriority.MEDIUM)
.withInclusiveStartAt(startTime)
.withInclusiveEndAt(endTime);
A continuación, se incluyen descripciones de las opciones de readChangeStream()
:
Configuración de Spanner (obligatoria)
Se usa para configurar el proyecto, la instancia y la base de datos en la que se creó el flujo de cambios y desde la que se debe consultar. También especifica de manera opcional el rol de base de datos que se usará cuando el principal de IAM que ejecuta la tarea de Dataflow sea un usuario de control de acceso detallado. El trabajo asume este rol de base de datos para acceder al flujo de cambios. Para obtener más información, consulta Información sobre el control de acceso detallado.
Nombre del flujo de cambios (obligatorio)
Este nombre identifica de forma única el flujo de cambios. El nombre aquí debe ser el mismo que se usó cuando se creó.
ID de instancia de metadatos (opcional)
Esta es la instancia para almacenar los metadatos que usa el conector para controlar el consumo de los datos de la API del flujo de cambios.
ID de la base de datos de metadatos (obligatorio)
Esta es la base de datos para almacenar los metadatos que usa el conector para controlar el consumo de los datos de la API del flujo de cambios.
Nombre de la tabla de metadatos (opcional)
Esto solo debe usarse cuando se actualiza una canalización existente.
Este es el nombre de la tabla de metadatos preexistente que usará el conector. El conector lo usa para almacenar los metadatos y controlar el consumo de los datos de la API del flujo de cambios. Si se omite esta opción, Spanner crea una tabla nueva con un nombre generado en la inicialización del conector.
Prioridad de RPC (opcional)
La prioridad de la solicitud que se usará para las consultas del flujo de cambios. Si se omite este parámetro, se usará high
priority
.
InclusiveStartAt (obligatorio)
Los cambios de la marca de tiempo determinada se devuelven al llamador.
InclusiveEndAt (opcional)
Los cambios hasta la marca de tiempo determinada se devuelven al llamador. Si se omite este parámetro, los cambios se emitirán de forma indefinida.
Agrega transformaciones y destinos para procesar datos de cambios
Una vez completados los pasos anteriores, el conector SpannerIO configurado está listo para emitir una PCollection de objetos DataChangeRecord
.
Consulta Ejemplos de transformaciones y destinos para ver varias configuraciones de canalización de muestra que procesan estos datos transmitidos de varias maneras.
Ten en cuenta que los registros de flujo de cambios que emite el conector SpannerIO no están ordenados. Esto se debe a que las PCollections no proporcionan ninguna garantía de orden. Si necesitas una transmisión ordenada, debes agrupar y ordenar los registros como transformaciones en tus canalizaciones. Consulta Ejemplo: Ordenar por clave. Puedes extender este ejemplo para ordenar los registros según cualquier campo de los registros, como los IDs de transacción.
Ejemplos de transformaciones y receptores
Puedes definir tus propias transformaciones y especificar los receptores en los que se escribirán los datos. La documentación de Apache Beam proporciona una gran cantidad de transformaciones que se pueden aplicar, así como conectores de E/S listos para usar para escribir los datos en sistemas externos.
Ejemplo: Ordenar por clave
En esta muestra de código, se emiten registros de cambios de datos ordenados por marca de tiempo de confirmación y agrupados por claves primarias con el conector de Dataflow.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new BreakRecordByModFn()))
.apply(ParDo.of(new KeyByIdFn()))
.apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
// Subsequent processing goes here
En esta muestra de código, se usan estados y temporizadores para almacenar en búfer los registros de cada clave y establecer el tiempo de vencimiento del temporizador en un momento T
configurado por el usuario en el futuro (definido en la función BufferKeyUntilOutputTimestamp). Cuando la marca de agua de Dataflow pasa el tiempo T
, este código borra todos los registros del búfer con una marca de tiempo inferior a T
, ordena estos registros por marca de tiempo de confirmación y genera un par clave-valor en el que se cumple lo siguiente:
- La clave es la clave de entrada, que es la clave primaria con hash en un array de bucket de tamaño 1000.
- El valor son los registros de cambios de datos ordenados que se almacenaron en búfer para la clave.
Para cada clave, tenemos las siguientes garantías:
- Se garantiza que los temporizadores se activen en orden de marca de tiempo de vencimiento.
- Se garantiza que las etapas descendentes reciban los elementos en el mismo orden en que se produjeron.
Por ejemplo, con una clave del valor 100, el temporizador se activa en T1
y T10
, respectivamente, lo que produce un paquete de registros de cambios de datos en cada marca de tiempo. Debido a que los registros de cambios de datos que se generaron en T1
se produjeron antes que los registros de cambios de datos que se generaron en T10
, se garantiza que los registros de cambios de datos que se generaron en T1
también se reciban en la siguiente etapa antes que los registros de cambios de datos que se generaron en T10
. Este mecanismo nos ayuda a garantizar un orden estricto de marcas de tiempo de confirmación por clave primaria para el procesamiento descendente.
Este proceso se repetirá hasta que finalice la canalización y se hayan procesado todos los registros de cambios de datos (o se repetirá de forma indefinida si no se especifica una hora de finalización).
Ten en cuenta que esta muestra de código usa estados y temporizadores, en lugar de ventanas, para realizar el ordenamiento por clave. El motivo es que no se garantiza que los períodos se procesen en orden. Esto significa que las ventanas más antiguas se pueden procesar más tarde que las más recientes, lo que podría generar un procesamiento fuera de orden.
BreakRecordByModFn
Cada registro de cambio de datos puede contener varios mods. Cada mod representa una inserción, actualización o eliminación en un solo valor de clave primaria. Esta función divide cada registro de cambio de datos en registros de cambio de datos independientes, uno por mod.
private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
DataChangeRecord> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
outputReceiver) {
record.getMods().stream()
.map(
mod ->
new DataChangeRecord(
record.getPartitionToken(),
record.getCommitTimestamp(),
record.getServerTransactionId(),
record.isLastRecordInTransactionInPartition(),
record.getRecordSequence(),
record.getTableName(),
record.getRowType(),
Collections.singletonList(mod),
record.getModType(),
record.getValueCaptureType(),
record.getNumberOfRecordsInTransaction(),
record.getNumberOfPartitionsInTransaction(),
record.getTransactionTag(),
record.isSystemTransaction(),
record.getMetadata()))
.forEach(outputReceiver::output);
}
}
KeyByIdFn
Esta función toma un DataChangeRecord
y muestra un DataChangeRecord
con la clave de la clave primaria de Spanner con hash a un valor entero.
private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
// NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
// Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
// of states and timers for performance purposes.
// Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
// On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
private static final int NUMBER_OF_BUCKETS = 1000;
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
// Hash the received keys into a bucket in order to have a
// deterministic number of buffers and timers.
String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);
outputReceiver.output(KV.of(bucketIndex, record));
}
}
BufferKeyUntilOutputTimestamp
Los temporizadores y los búferes son por clave. Esta función almacena en búfer cada registro de cambio de datos hasta que la marca de agua pasa la marca de tiempo en la que queremos generar los registros de cambio de datos almacenados en búfer.
Este código utiliza un temporizador de bucle para determinar cuándo limpiar el búfer:
- Cuando ve un registro de cambio de datos para una clave por primera vez, establece el temporizador para que se active en la marca de tiempo de confirmación del registro de cambio de datos +
incrementIntervalSeconds
(una opción que el usuario puede configurar). - Cuando se activa el temporizador, agrega todos los registros de cambios de datos en el búfer con una marca de tiempo inferior al tiempo de vencimiento del temporizador a
recordsToOutput
. Si el búfer tiene registros de cambios de datos cuya marca de tiempo es mayor o igual que la hora de vencimiento del temporizador, vuelve a agregar esos registros de cambios de datos al búfer en lugar de enviarlos. Luego, establece el siguiente temporizador en el tiempo de vencimiento del temporizador actual másincrementIntervalInSeconds
. - Si
recordsToOutput
no está vacía, la función ordena los registros de cambios de datos enrecordsToOutput
por marca de tiempo de confirmación y ID de transacción y, luego, los muestra.
private static class BufferKeyUntilOutputTimestamp extends
DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>> {
private static final Logger LOG =
LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);
private final long incrementIntervalInSeconds = 2;
private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
this.incrementIntervalInSeconds = incrementIntervalInSeconds;
}
@SuppressWarnings("unused")
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("keyString")
private final StateSpec<ValueState<String>> keyString =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void process(
@Element KV<String, DataChangeRecord> element,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
buffer.add(element.getValue());
// Only set the timer if this is the first time we are receiving a data change
// record with this key.
String elementKey = keyString.read();
if (elementKey == null) {
Instant commitTimestamp =
new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
Instant outputTimestamp =
commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
timer.set(outputTimestamp);
keyString.write(element.getKey());
}
}
@OnTimer("timer")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@TimerId("timer") Timer timer,
@StateId("keyString") ValueState<String> keyString) {
if (!buffer.isEmpty().read()) {
String elementKey = keyString.read();
final List<DataChangeRecord> records =
StreamSupport.stream(buffer.read().spliterator(), false)
.collect(Collectors.toList());
buffer.clear();
List<DataChangeRecord> recordsToOutput = new ArrayList<>();
for (DataChangeRecord record : records) {
Instant recordCommitTimestamp =
new Instant(record.getCommitTimestamp().toSqlTimestamp());
final String recordString =
record.getMods().get(0).getNewValuesJson().isEmpty()
? "Deleted record"
: record.getMods().get(0).getNewValuesJson();
// When the watermark passes time T, this means that all records with
// event time < T have been processed and successfully committed. Since the
// timer fires when the watermark passes the expiration time, we should
// only output records with event time < expiration time.
if (recordCommitTimestamp.isBefore(context.timestamp())) {
LOG.info(
"Outputting record with key {} and value {} at expiration " +
"timestamp {}",
elementKey,
recordString,
context.timestamp().toString());
recordsToOutput.add(record);
} else {
LOG.info(
"Expired at {} but adding record with key {} and value {} back to " +
"buffer due to commit timestamp {}",
context.timestamp().toString(),
elementKey,
recordString,
recordCommitTimestamp.toString());
buffer.add(record);
}
}
// Output records, if there are any to output.
if (!recordsToOutput.isEmpty()) {
// Order the records in place, and output them. The user would need
// to implement DataChangeRecordComparator class that sorts the
// data change records by commit timestamp and transaction ID.
Collections.sort(recordsToOutput, new DataChangeRecordComparator());
context.outputWithTimestamp(
KV.of(elementKey, recordsToOutput), context.timestamp());
LOG.info(
"Expired at {}, outputting records for key {}",
context.timestamp().toString(),
elementKey);
} else {
LOG.info("Expired at {} with no records", context.timestamp().toString());
}
}
Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
LOG.info("Setting next timer to {}", nextTimer.toString());
timer.set(nextTimer);
} else {
LOG.info(
"Timer not being set since the buffer is empty: ");
keyString.clear();
}
}
}
Cómo ordenar transacciones
Esta canalización se puede cambiar para ordenar por ID de transacción y marca de tiempo de confirmación. Para ello, almacena en búfer los registros de cada par de ID de transacción o marca de tiempo de confirmación, en lugar de cada clave de Spanner. Esto requiere la modificación del código en KeyByIdFn.
Ejemplo: Cómo ensamblar transacciones
Esta muestra de código lee registros de cambios de datos, ensambla todos los registros de cambios de datos que pertenecen a la misma transacción en un solo elemento y muestra ese elemento. Ten en cuenta que las transacciones que genera este código de muestra no se ordenan por marca de tiempo de confirmación.
En esta muestra de código, se usan búferes para ensamblar transacciones a partir de registros de cambios de datos. Cuando recibe un registro de cambio de datos que pertenece a una transacción por primera vez, lee el campo numberOfRecordsInTransaction
en el registro de cambio de datos, que describe la cantidad esperada de registros de cambio de datos que pertenecen a esa transacción. Almacena en búfer los registros de cambios de datos que pertenecen a esa transacción hasta que la cantidad de registros almacenados en búfer coincida con numberOfRecordsInTransaction
, momento en el que se escriben los registros de cambios de datos agrupados.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new KeyByTransactionIdFn()))
.apply(ParDo.of(new TransactionBoundaryFn()))
// Subsequent processing goes here
KeyByTransactionIdFn
Esta función toma un DataChangeRecord
y muestra un DataChangeRecord
con la clave del ID de transacción.
private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
@ProcessElement
public void processElement(
@Element DataChangeRecord record,
OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
outputReceiver.output(KV.of(record.getServerTransactionId(), record));
}
}
TransactionBoundaryFn
TransactionBoundaryFn
almacena en búfer los pares clave-valor recibidos de {TransactionId, DataChangeRecord}
desde KeyByTransactionIdFn
y los almacena en búfer en grupos según TransactionId
. Cuando la cantidad de registros almacenados en búfer es igual a la cantidad de registros contenidos en toda la transacción, esta función ordena los objetos DataChangeRecord
del grupo por secuencia de registro y genera un par clave-valor de {CommitTimestamp, TransactionId}
, Iterable<DataChangeRecord>
.
Aquí, suponemos que SortKey
es una clase definida por el usuario que representa un par {CommitTimestamp, TransactionId}
. Para obtener más información sobre SortKey
, consulta la implementación de ejemplo.
private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>> {
@StateId("buffer")
private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<DataChangeRecord> buffer,
@StateId("count") ValueState<Integer> countState) {
final KV<String, DataChangeRecord> element = context.element();
final DataChangeRecord record = element.getValue();
buffer.add(record);
int count = (countState.read() != null ? countState.read() : 0);
count = count + 1;
countState.write(count);
if (count == record.getNumberOfRecordsInTransaction()) {
final List<DataChangeRecord> sortedRecords =
StreamSupport.stream(buffer.read().spliterator(), false)
.sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
.collect(Collectors.toList());
final Instant commitInstant =
new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
.getTime());
context.outputWithTimestamp(
KV.of(
new SortKey(sortedRecords.get(0).getCommitTimestamp(),
sortedRecords.get(0).getServerTransactionId()),
sortedRecords),
commitInstant);
buffer.clear();
countState.clear();
}
}
}
Ejemplo: Filtra por etiqueta de transacción
Cuando se etiqueta una transacción que modifica los datos del usuario, la etiqueta correspondiente y su tipo se almacenan como parte de DataChangeRecord
. En estos ejemplos, se muestra cómo filtrar registros de flujo de cambios según etiquetas de transacción definidas por el usuario y etiquetas del sistema:
Filtrado de etiquetas definido por el usuario para my-tx-tag
:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
!record.isSystemTransaction()
&& record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
// Subsequent processing goes here
Filtrado de etiquetas del sistema o auditoría de TTL:
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(Filter.by(record ->
record.isSystemTransaction()
&& record.getTransactionTag().equals("RowDeletionPolicy")))
// Subsequent processing goes here
Ejemplo: Cómo recuperar una fila completa
Este ejemplo funciona con una tabla de Spanner llamada Singer
que tiene la siguiente definición:
CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024)
) PRIMARY KEY (SingerId);
En el modo de captura de valor OLD_AND_NEW_VALUES
predeterminado de los flujos de cambios,
cuando haya una actualización de una fila de Spanner, el registro de cambios de datos
recibido contendrá solo las columnas que se modificaron. Las columnas a las que se les hizo un seguimiento, pero que no se modificaron, no se incluirán en el registro. La clave primaria del
mod se puede usar para realizar una lectura de instantánea de Spanner en la marca de tiempo
de confirmación del registro de cambios de datos para recuperar las columnas sin modificar o incluso
recuperar la fila completa.
Ten en cuenta que es posible que debas cambiar la política de retención de la base de datos a un valor mayor o igual que la política de retención del flujo de cambios para que la lectura de la instantánea se realice correctamente.
Además, ten en cuenta que usar el tipo de captura de valor NEW_ROW
es la forma recomendada y más eficiente de hacerlo, ya que muestra todas las columnas de seguimiento de la fila de forma predeterminada y no requiere una lectura de instantánea adicional en Spanner.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
// Assume we have a change stream "my-change-stream" that watches Singers table.
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
// Subsequent processing goes here
ToFullRowJsonFn
Esta transformación realizará una lectura inactiva en la marca de tiempo de confirmación de cada registro recibido y asignará la fila completa a JSON.
public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
// Since each instance of this DoFn will create its own session pool and will
// perform calls to Spanner sequentially, we keep the number of sessions in
// the pool small. This way, we avoid wasting resources.
private static final int MIN_SESSIONS = 1;
private static final int MAX_SESSIONS = 5;
private final String projectId;
private final String instanceId;
private final String databaseId;
private transient DatabaseClient client;
private transient Spanner spanner;
public ToFullRowJsonFn(SpannerConfig spannerConfig) {
this.projectId = spannerConfig.getProjectId().get();
this.instanceId = spannerConfig.getInstanceId().get();
this.databaseId = spannerConfig.getDatabaseId().get();
}
@Setup
public void setup() {
SessionPoolOptions sessionPoolOptions = SessionPoolOptions
.newBuilder()
.setMinSessions(MIN_SESSIONS)
.setMaxSessions(MAX_SESSIONS)
.build();
SpannerOptions options = SpannerOptions
.newBuilder()
.setProjectId(projectId)
.setSessionPoolOption(sessionPoolOptions)
.build();
DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
spanner = options.getService();
client = spanner.getDatabaseClient(id);
}
@Teardown
public void teardown() {
spanner.close();
}
@ProcessElement
public void process(
@Element DataChangeRecord element,
OutputReceiver<String> output) {
com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
element.getMods().forEach(mod -> {
JSONObject keysJson = new JSONObject(mod.getKeysJson());
JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
ModType modType = element.getModType();
JSONObject jsonRow = new JSONObject();
long singerId = keysJson.getLong("SingerId");
jsonRow.put("SingerId", singerId);
if (modType == ModType.INSERT) {
// For INSERT mod, get non-primary key columns from mod.
jsonRow.put("FirstName", newValuesJson.get("FirstName"));
jsonRow.put("LastName", newValuesJson.get("LastName"));
} else if (modType == ModType.UPDATE) {
// For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
try (ResultSet resultSet = client
.singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
.read(
"Singers",
KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
Arrays.asList("FirstName", "LastName"))) {
if (resultSet.next()) {
jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
JSONObject.NULL : resultSet.getString("FirstName"));
jsonRow.put("LastName", resultSet.isNull("LastName") ?
JSONObject.NULL : resultSet.getString("LastName"));
}
}
} else {
// For DELETE mod, there is nothing to do, as we already set SingerId.
}
output.output(jsonRow.toString());
});
}
}
Este código crea un cliente de base de datos de Spanner para realizar la recuperación de filas completa y configura el grupo de sesiones para que tenga solo algunas sesiones, realizando lecturas en una instancia de ToFullReowJsonFn
de forma secuencial.
Dataflow se asegura de crear muchas instancias de esta función, cada una con su propio grupo de clientes.
Ejemplo: De Spanner a Pub/Sub
En esta situación, el llamador transmite registros a Pub/Sub lo más rápido posible, sin ninguna agrupación ni agregación. Esta es una buena opción para activar el procesamiento descendente, así como transmitir todas las filas nuevas insertadas en una tabla de Spanner a Pub/Sub para un procesamiento adicional.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("my-topic"));
Ten en cuenta que el sink de Pub/Sub se puede configurar para garantizar la semántica de exactamente una vez.
Ejemplo: Spanner a Cloud Storage
En esta situación, el llamador agrupa todos los registros dentro de un período determinado y guarda el grupo en archivos de Cloud Storage independientes. Esta es una buena opción para las estadísticas y el archivo de un momento determinado, que es independiente del período de retención de Spanner.
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role")) // Needed for fine-grained access control only
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO
.write()
.to("gs://my-bucket/change-stream-results-")
.withSuffix(".txt")
.withWindowedWrites()
.withNumShards(1));
Ten en cuenta que el sink de Cloud Storage proporciona semántica de al menos una vez de forma predeterminada. Con procesamiento adicional, se puede modificar para tener semántica de una y solo una vez.
También proporcionamos una plantilla de Dataflow para este caso de uso: consulta Cómo conectar flujos de cambios a Cloud Storage.
Ejemplo: Spanner a BigQuery (tabla de registro)
Aquí, el llamador transmite registros de cambios a BigQuery. Cada registro de cambios de datos se refleja como una fila en BigQuery. Esta es una buena opción para las estadísticas. Este código usa las funciones definidas anteriormente, en la sección Cómo recuperar una fila completa, para recuperar la fila completa del registro y escribirla en BigQuery.
SpannerConfig spannerConfig = SpannerConfig
.create()
.withProjectId("my-project-id")
.withInstanceId("my-instance-id")
.withDatabaseId("my-database-id")
.withDatabaseRole("my-database-role"); // Needed for fine-grained access control only
pipeline
.apply(SpannerIO
.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName("my-change-stream")
.withMetadataInstance("my-metadata-instance-id")
.withMetadataDatabase("my-metadata-database-id")
.withInclusiveStartAt(Timestamp.now()))
.apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
.apply(BigQueryIO
.<String>write()
.to("my-bigquery-table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema()
.setName("SingerId")
.setType("INT64")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("FirstName")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("LastName")
.setType("STRING")
.setMode("REQUIRED")
)))
.withAutoSharding()
.optimizedWrites()
.withFormatFunction((String element) -> {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(element);
} catch (IOException e) {
e.printStackTrace();
}
return new TableRow()
.set("SingerId", jsonNode.get("SingerId").asInt())
.set("FirstName", jsonNode.get("FirstName").asText())
.set("LastName", jsonNode.get("LastName").asText());
}
)
);
Ten en cuenta que el destino de BigQuery proporciona semántica de al menos una vez de forma predeterminada. Con un procesamiento adicional, se puede modificar para tener una semántica de una y solo una vez.
También proporcionamos una plantilla de Dataflow para este caso de uso. Consulta Cómo conectar flujos de cambios a BigQuery.
Supervisa una canalización
Existen dos clases de métricas disponibles para supervisar una canalización de Dataflow de flujo de cambios.
Métricas estándar de Dataflow
Dataflow proporciona varias métricas para garantizar que tu trabajo esté en buen estado, como la actualización de datos, la demora del sistema, la capacidad de procesamiento del trabajo, el uso de la CPU del trabajador y mucho más. Puedes encontrar más información en Usa Monitoring para las canalizaciones de Dataflow.
En el caso de las canalizaciones de flujos de cambios, hay dos métricas principales que se deben tener en cuenta: la latencia del sistema y la actualización de los datos.
La latencia del sistema te indicará la duración máxima actual (en segundos) durante la cual se procesa o espera el procesamiento de un elemento de datos.
La actualidad de los datos te mostrará la cantidad de tiempo transcurrido entre el momento actual (tiempo real) y la marca de agua de salida. La marca de agua de salida de la hora T
indica que todos los elementos con una hora del evento (estrictamente) anterior a T
se procesaron para su cálculo. En otras palabras, la actualización de los datos mide qué tan actualizada está la canalización en relación con el procesamiento de los eventos que recibió.
Si la canalización tiene pocos recursos, puedes ver ese efecto en estas dos métricas. La latencia del sistema aumentará, ya que los elementos deben esperar más tiempo antes de que se procesen. La actualidad de los datos también aumentará, ya que la canalización no podrá seguir el ritmo de la cantidad de datos recibidos.
Métricas de flujo de cambios personalizadas
Estas métricas se exponen en Cloud Monitoring y son las siguientes:
- Latencia agrupada (histograma) entre el momento en que se confirma un registro en Spanner y el momento en que el conector lo emite en una PCollection. Esta métrica se puede usar para ver si hay problemas de rendimiento (latencia) con la canalización.
- Es la cantidad total de registros de datos leídos. Esta es una indicación general de la cantidad de registros que emite el conector. Esta cantidad debería aumentar constantemente, reflejando la tendencia de las operaciones de escritura en la base de datos subyacente de Spanner.
- Es la cantidad de particiones que se están leyendo. Siempre debe haber particiones que se lean. Si este número es cero, significa que se produjo un error en la canalización.
- Es la cantidad total de consultas emitidas durante la ejecución del conector. Este es un indicador general de las consultas de flujo de cambios que se realizan a la instancia de Spanner durante la ejecución de la canalización. Esto se puede usar para obtener una estimación de la carga del conector a la base de datos de Spanner.
Actualiza una canalización existente
Es posible actualizar una canalización en ejecución que usa el conector de SpannerIO para procesar flujos de cambios si se aprueban las verificaciones de compatibilidad del trabajo. Para ello, debes establecer de forma explícita el parámetro de nombre de la tabla de metadatos del trabajo nuevo cuando lo actualices. Usa el valor de la opción de canalización metadataTable
del trabajo que estás actualizando.
Si usas una plantilla de Dataflow proporcionada por Google, establece el nombre de la tabla con el parámetro spannerMetadataTableName
. También puedes modificar tu trabajo existente para usar de forma explícita la tabla de metadatos con el método withMetadataTable(your-metadata-table-name)
en la configuración del conector. Una vez que lo hagas, puedes seguir las instrucciones en Cómo iniciar tu trabajo de reemplazo de la documentación de Dataflow para actualizar un trabajo en ejecución.
Prácticas recomendadas para los flujos de cambios y Dataflow
A continuación, se incluyen algunas prácticas recomendadas para compilar conexiones de flujos de cambios con Dataflow.
Usa una base de datos de metadatos independiente
Te recomendamos que crees una base de datos independiente para que el conector de SpannerIO la use para el almacenamiento de metadatos, en lugar de configurarlo para que use la base de datos de tu aplicación.
Para obtener más información, consulta Considera usar una base de datos de metadatos independiente.
Ajusta el tamaño del clúster
Una regla general para una cantidad inicial de trabajadores en un trabajo de flujos de cambios de Spanner es un trabajador por cada 1,000 operaciones de escritura por segundo. Ten en cuenta que esta estimación puede variar según varios factores, como el tamaño de cada transacción, la cantidad de registros de flujo de cambios que se producen a partir de una sola transacción y otras transformaciones, agregaciones o destinos que se usan en la canalización.
Después de la asignación de recursos inicial, es importante hacer un seguimiento de las métricas que se mencionan en Cómo supervisar una canalización para garantizar que la canalización esté en buen estado. Te recomendamos que experimentes con un tamaño inicial del grupo de trabajadores y supervises cómo tu canalización controla la carga, y que aumentes la cantidad de nodos si es necesario. El uso de la CPU es una métrica clave para verificar si la carga es adecuada y si se necesitan más nodos.
Limitaciones conocidas
Existen algunas limitaciones conocidas cuando se usan flujos de cambios de Spanner con Dataflow:
Ajuste de escala automático
La compatibilidad con el ajuste de escala automático para cualquier canalización que incluya SpannerIO.readChangeStream
requiere Apache Beam 2.39.0
o una versión posterior.
Si usas una versión de Apache Beam anterior a 2.39.0
, las canalizaciones que incluyen
SpannerIO.readChangeStream
deben especificar de forma explícita el algoritmo de ajuste de escala automático
como NONE
, como se describe en Ajuste de escala automático horizontal.
Para escalar una canalización de Dataflow de forma manual en lugar de usar el ajuste de escala automático, consulta Escala una canalización de transmisión de forma manual.
Runner v2
El conector de flujos de cambios de Spanner requiere Dataflow Runner v2.
Esto se debe especificar de forma manual durante la ejecución o se arrojará un error. Para especificar Runner V2
, configura tu trabajo con
--experiments=use_unified_worker,use_runner_v2
.
Instantánea
El conector de flujos de cambios de Spanner no admite instantáneas de flujo de datos.
Desviando
El conector de flujos de cambios de Spanner no admite vaciar un trabajo. Solo se puede cancelar un trabajo existente.
También puedes actualizar una canalización existente sin necesidad de detenerla.
OpenCensus
Para usar OpenCensus para supervisar tu canalización, especifica la versión 0.28.3 o una posterior.
NullPointerException
al inicio de la canalización
Un error en la versión 2.38.0
de Apache Beam puede causar un NullPointerException
cuando se inicia la canalización en determinadas condiciones. Esto evitaría que se inicie tu trabajo y, en su lugar, se mostrará este mensaje de error:
java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null
Para abordar este problema, usa la versión 2.39.0
o posterior de Apache Beam, o bien especifica manualmente la versión de beam-sdks-java-core
como 2.37.0
:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>