La plantilla de flujos de cambios de Bigtable a Pub/Sub es un flujo de procesamiento en streaming que transmite registros de cambios de datos de Bigtable y los publica en un tema de Pub/Sub mediante Dataflow.
Un flujo de cambios de Bigtable te permite suscribirte a mutaciones de datos por tabla. Cuando te suscribes a los flujos de cambios de una tabla, se aplican las siguientes restricciones:
- Solo se devuelven las celdas modificadas y los descriptores de las operaciones de eliminación.
- Solo se devuelve el nuevo valor de una celda modificada.
Cuando se publican registros de cambios de datos en un tema de Pub/Sub, los mensajes se pueden insertar en un orden diferente al orden original de las marcas de tiempo de confirmación de Bigtable.
Los registros de cambios de datos de Bigtable que no se pueden publicar en temas de Pub/Sub se colocan temporalmente en un directorio de cola de mensajes fallidos (cola de mensajes no procesados) en Cloud Storage. Una vez que se alcanza el número máximo de reintentos fallidos, estos registros se colocan indefinidamente en el mismo directorio de la cola de mensajes fallidos para que los revise un humano o para que el usuario los procese más adelante.
La canalización requiere que exista el tema de Pub/Sub de destino. El tema de destino puede configurarse para validar los mensajes mediante un esquema. Cuando un tema de Pub/Sub especifica un esquema, la canalización solo se inicia si el esquema es válido. En función del tipo de esquema, utilice una de las siguientes definiciones de esquema para el tema de destino:
Búferes de protocolo
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Usa el siguiente esquema de Protobuf con la codificación de mensajes JSON
:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Cada mensaje nuevo de Pub/Sub incluye una entrada de un registro de cambios de datos devuelto por el flujo de cambios de su fila correspondiente en tu tabla de Bigtable. La plantilla de Pub/Sub aplana las entradas de cada registro de cambio de datos en cambios individuales a nivel de celda.
Descripción del mensaje de salida de Pub/Sub
Nombre del campo | Descripción |
---|---|
rowKey |
Clave de la fila modificada. Llega en forma de matriz de bytes. Cuando se configura la codificación de mensajes JSON, las claves de las filas se devuelven como cadenas. Cuando se especifica useBase64Rowkeys , las claves de las filas se codifican en Base64. De lo contrario, se usa un conjunto de caracteres especificado por bigtableChangeStreamCharset para decodificar los bytes de la clave de fila en una cadena. |
modType |
El tipo de mutación de la fila. Usa uno de los siguientes valores: SET_CELL , DELETE_CELLS o DELETE_FAMILY . |
columnFamily |
La familia de columnas afectada por la mutación de la fila. |
column |
El calificador de columna afectado por la mutación de la fila. En el caso del tipo de mutación DELETE_FAMILY , el campo de columna no está definido. Llega en forma de matriz de bytes. Cuando se configura la codificación de mensajes JSON, las columnas se devuelven como cadenas. Cuando se especifica useBase64ColumnQualifier , el campo de columna se codifica en Base64. De lo contrario, se usa un conjunto de caracteres especificado por bigtableChangeStreamCharset para decodificar los bytes de la clave de fila en una cadena. |
commitTimestamp |
La hora en la que Bigtable aplica la mutación. La hora se mide en microsegundos desde el inicio del registro de tiempo de Unix (1 de enero de 1970 a las 00:00:00 UTC). |
timestamp |
Valor de marca de tiempo de la celda afectada por la mutación. En los tipos de mutación DELETE_CELLS y DELETE_FAMILY , no se define ninguna marca de tiempo. La hora se mide en microsegundos desde el inicio del registro de tiempo de Unix (1 de enero de 1970 a las 00:00:00 UTC). |
timestampFrom |
Describe el inicio inclusivo del intervalo de marca de tiempo de todas las celdas eliminadas por la mutación DELETE_CELLS . En el caso de otros tipos de mutaciones, timestampFrom no se define. La hora se mide en microsegundos desde el inicio del registro de tiempo de Unix (1 de enero de 1970 a las 00:00:00 UTC). |
timestampTo |
Describe el final exclusivo del intervalo de marca de tiempo de todas las celdas eliminadas por la mutación DELETE_CELLS . En el caso de otros tipos de mutaciones, timestampTo no se define. |
isGC |
Valor booleano que indica si la mutación se ha generado mediante un mecanismo de recogida de elementos no utilizados de Bigtable. |
tieBreaker |
Cuando dos mutaciones se registran al mismo tiempo en diferentes clústeres de Bigtable, se aplica a la tabla de origen la mutación con el valor tiebreaker más alto. Las mutaciones con valores de tiebreaker más bajos se descartan. |
value |
El nuevo valor definido por la mutación. A menos que se defina la opción de canalización stripValues , el valor se asigna a las mutaciones de SET_CELL . En el caso de otros tipos de mutación, el valor no se define. Llega en forma de matriz de bytes. Cuando se configura la codificación de mensajes JSON, los valores se devuelven como cadenas.
Si se especifica useBase64Values , el valor se codifica en Base64. De lo contrario, se usa un conjunto de caracteres especificado por bigtableChangeStreamCharset para decodificar los bytes de valor en una cadena. |
sourceInstance |
Nombre de la instancia de Bigtable que ha registrado la mutación. Puede ocurrir cuando varias canalizaciones transmiten cambios de diferentes instancias al mismo tema de Pub/Sub. |
sourceCluster |
El nombre del clúster de Bigtable que ha registrado la mutación. Se puede usar cuando varias canalizaciones transmiten cambios de diferentes instancias al mismo tema de Pub/Sub. |
sourceTable |
Nombre de la tabla de Bigtable que ha recibido la mutación. Se puede usar en el caso de que una transmisión de varias canalizaciones cambie de diferentes tablas al mismo tema de Pub/Sub. |
Requisitos del flujo de procesamiento
- La instancia de origen de Bigtable especificada.
- La tabla de origen de Bigtable especificada. La tabla debe tener habilitados los flujos de cambios.
- El perfil de aplicación de Bigtable especificado.
- El tema de Pub/Sub especificado debe existir.
Parámetros de plantilla
Parámetros obligatorios
- pubSubTopic el nombre del tema de Pub/Sub de destino.
- bigtableChangeStreamAppProfile el ID del perfil de aplicación de Bigtable. El perfil de la aplicación debe usar el enrutamiento de un solo clúster y permitir transacciones de una sola fila.
- bigtableReadInstanceId el ID de la instancia de Bigtable de origen.
- bigtableReadTableId el ID de la tabla de Bigtable de origen.
Parámetros opcionales
- messageEncoding la codificación de los mensajes que se publicarán en el tema de Pub/Sub. Cuando se configura el esquema del tema de destino, la codificación de los mensajes se determina mediante los ajustes del tema. Se admiten los siguientes valores:
BINARY
yJSON
. El valor predeterminado esJSON
. - messageFormat la codificación de los mensajes que se publicarán en el tema de Pub/Sub. Cuando se configura el esquema del tema de destino, la codificación de los mensajes se determina mediante los ajustes del tema. Se admiten los siguientes valores:
AVRO
,PROTOCOL_BUFFERS
yJSON
. El valor predeterminado esJSON
. Cuando se usa el formatoJSON
, los campos rowKey, column y value del mensaje son cadenas cuyo contenido se determina mediante las opciones de la canalizaciónuseBase64Rowkeys
,useBase64ColumnQualifiers
,useBase64Values
ybigtableChangeStreamCharset
. - stripValues si se define como
true
, las mutaciones deSET_CELL
se devuelven sin que se hayan definido nuevos valores. El valor predeterminado esfalse
. Este parámetro es útil cuando no necesitas que haya un valor nuevo (también conocido como invalidación de caché) o cuando los valores son extremadamente grandes y superan los límites de tamaño de los mensajes de Pub/Sub. - dlqDirectory el directorio de la cola de mensajes fallidos. Los registros que no se pueden procesar se almacenan en este directorio. El valor predeterminado es un directorio de la ubicación temporal de la tarea de Dataflow. En la mayoría de los casos, puede usar la ruta predeterminada.
- dlqRetryMinutes el número de minutos que transcurren entre los reintentos de la cola de mensajes fallidos. El valor predeterminado es
10
. - dlqMaxRetries el número máximo de reintentos de mensajes fallidos. El valor predeterminado es
5
. - useBase64Rowkeys se usa con la codificación de mensajes JSON. Si se define como
true
, el camporowKey
es una cadena codificada en Base64. De lo contrario,rowKey
se genera usandobigtableChangeStreamCharset
para decodificar bytes en una cadena. El valor predeterminado esfalse
. - pubSubProjectId el ID del proyecto de Bigtable. El valor predeterminado es el proyecto de la tarea de Dataflow.
- useBase64ColumnQualifiers se usa con la codificación de mensajes JSON. Si se define como
true
, el campocolumn
es una cadena codificada en Base64. De lo contrario, la columna se genera usandobigtableChangeStreamCharset
para decodificar bytes en una cadena. El valor predeterminado esfalse
. - useBase64Values se usa con la codificación de mensajes JSON. Si se define como
true
, el campo de valor es una cadena codificada en Base64. De lo contrario, el valor se genera usandobigtableChangeStreamCharset
para decodificar bytes en una cadena. El valor predeterminado esfalse
. - disableDlqRetries indica si se deben inhabilitar los reintentos de la cola de mensajes fallidos. Valor predeterminado: false.
- bigtableChangeStreamMetadataInstanceId el ID de instancia de metadatos de los flujos de cambios de Bigtable. El valor predeterminado es una cadena vacía.
- bigtableChangeStreamMetadataTableTableId el ID de la tabla de metadatos del conector de flujos de cambios de Bigtable. Si no se proporciona, se crea automáticamente una tabla de metadatos del conector de flujos de cambios de Bigtable durante la ejecución de la canalización. El valor predeterminado es una cadena vacía.
- bigtableChangeStreamCharset nombre del conjunto de caracteres de los flujos de cambios de Bigtable. El valor predeterminado es UTF-8.
- bigtableChangeStreamStartTimestamp marca de tiempo inicial (https://tools.ietf.org/html/rfc3339) (inclusive) que se usará para leer los flujos de cambios. Por ejemplo,
2022-05-05T07:59:59Z
. El valor predeterminado es la marca de tiempo de la hora de inicio de la canalización. - bigtableChangeStreamIgnoreColumnFamilies lista de cambios en los nombres de las familias de columnas separados por comas que se deben ignorar. El valor predeterminado es una cadena vacía.
- bigtableChangeStreamIgnoreColumns lista de cambios de nombres de columnas separados por comas que se deben ignorar. Ejemplo: "cf1:col1,cf2:col2". El valor predeterminado es una cadena vacía.
- bigtableChangeStreamName nombre único de la canalización del cliente. Te permite reanudar el procesamiento desde el punto en el que se detuvo una canalización que se estaba ejecutando. El valor predeterminado es un nombre generado automáticamente. Consulte los registros de trabajos de Dataflow para ver el valor utilizado.
- bigtableChangeStreamResume si se define como
true
, una nueva canalización reanuda el procesamiento desde el punto en el que se detuvo una canalización que se estaba ejecutando anteriormente con el mismo valor debigtableChangeStreamName
. Si la canalización con el valorbigtableChangeStreamName
proporcionado no se ha ejecutado nunca, no se iniciará ninguna canalización. Si se define comofalse
, se inicia una nueva canalización. Si ya se ha ejecutado una canalización con el mismo valor debigtableChangeStreamName
para la fuente en cuestión, no se iniciará una nueva canalización. El valor predeterminado esfalse
. - bigtableReadChangeStreamTimeoutMs tiempo de espera de las solicitudes ReadChangeStream de Bigtable en milisegundos.
- bigtableReadProjectId el ID del proyecto de Bigtable. El valor predeterminado es el proyecto de la tarea de Dataflow.
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Bigtable change streams to Pub/Sub template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasVERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
BIGTABLE_INSTANCE_ID
: el ID de tu instancia de Bigtable.BIGTABLE_TABLE_ID
: el ID de tu tabla de Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: el ID de tu perfil de aplicación de Bigtable.PUBSUB_TOPIC
: el nombre del tema de destino de Pub/Sub
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasVERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
BIGTABLE_INSTANCE_ID
: el ID de tu instancia de Bigtable.BIGTABLE_TABLE_ID
: el ID de tu tabla de Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: el ID de tu perfil de aplicación de Bigtable.PUBSUB_TOPIC
: el nombre del tema de destino de Pub/Sub
Siguientes pasos
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.