La plantilla de flujos de cambios de Bigtable a Pub/Sub es una canalización de transmisión que transmite los registros de cambios de datos de Bigtable y los publica en un tema de Pub/Sub con Dataflow.
Un flujo de cambios de Bigtable te permite suscribirte a las mutaciones de los datos por tabla. Cuando te suscribes a transmisiones de cambios de tablas, se aplican las siguientes restricciones:
- Solo se muestran las celdas y los descriptores modificados de las operaciones de eliminación.
- Solo se muestra el valor nuevo de una celda modificada.
Cuando los registros de cambios de datos se publican en un tema de Pub/Sub, los mensajes se pueden insertar de forma desordenada en comparación con el orden original de las marcas de tiempo de confirmación de Bigtable.
Los registros de cambios de datos de Bigtable que no se pueden publicar en temas de Pub/Sub se colocan temporalmente en un directorio de cola de mensajes no entregados (cola de mensajes no procesados) en Cloud Storage. Después de la cantidad máxima de reintentos fallidos, estos registros se colocan de forma indefinida en el mismo directorio de cola de mensajes no entregados para que el usuario los revise o los procese.
La canalización requiere que el tema de Pub/Sub de destino exista. El tema de destino puede configurarse para validar mensajes con un esquema. Cuando un tema de Pub/Sub especifica un esquema, la canalización solo se inicia si el esquema es válido. Según el tipo de esquema, usa una de las siguientes definiciones de esquema para el tema de destino:
Protocol Buffers
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Usa el siguiente esquema de Protobuf con codificación de mensajes JSON
:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Cada mensaje de Pub/Sub nuevo incluye una entrada de un registro de cambios de datos que muestra el flujo de cambios de su fila correspondiente en tu tabla de Bigtable. La plantilla de Pub/Sub aplana las entradas de cada registro de cambios y las convierte en cambios individuales a nivel de celda.
Descripción del mensaje de salida de Pub/Sub
Nombre del campo | Descripción |
---|---|
rowKey |
La clave de la fila modificada. Llega como un array de bytes. Cuando se configura la codificación de mensajes JSON, las claves de fila se muestran como strings. Cuando se especifica useBase64Rowkeys , las claves de fila se codifican en Base64. De lo contrario, se usa un charset especificado por bigtableChangeStreamCharset para decodificar los bytes de clave de fila en una string. |
modType |
El tipo de mutación de la fila. Usa uno de los siguientes valores: SET_CELL , DELETE_CELLS o DELETE_FAMILY . |
columnFamily |
La familia de columnas afectada por la mutación de la fila. |
column |
El calificador de columna afectado por la mutación de la fila. Para el tipo de mutación DELETE_FAMILY , no se establece el campo de la columna. Llega como un array de bytes. Cuando se configura la codificación de mensajes JSON, las columnas se muestran como strings. Cuando se especifica useBase64ColumnQualifier , el campo de la columna está codificado en Base64. De lo contrario, se usa un charset especificado por bigtableChangeStreamCharset para decodificar los bytes de clave de fila en una string. |
commitTimestamp |
El momento en que Bigtable aplica la mutación. El tiempo se mide en microsegundos desde el tiempo Unix (1 de enero de 1970 en UTC). |
timestamp |
El valor de la marca de tiempo de la celda afectada por la mutación. Para los tipos de mutación DELETE_CELLS y DELETE_FAMILY , no se configuró la marca de tiempo. El tiempo se mide en microsegundos desde el tiempo Unix (1 de enero de 1970 en UTC). |
timestampFrom |
Describe un inicio inclusivo del intervalo de marcas de tiempo para todas las celdas que borró la mutación DELETE_CELLS . Para otros tipos de mutaciones, timestampFrom no está configurado. El tiempo se mide en microsegundos desde el tiempo Unix (1 de enero de 1970 en UTC). |
timestampTo |
Describe un final exclusivo del intervalo de marca de tiempo para todas las celdas que borró la mutación DELETE_CELLS . Para otros tipos de mutaciones, timestampTo no está configurado. |
isGC |
Un valor booleano que indica si la mutación se genera a través de un mecanismo de recolección de elementos no utilizados de Bigtable. |
tieBreaker |
Cuando diferentes clústeres de Bigtable registran dos mutaciones al mismo tiempo, se aplica a la tabla de origen la mutación con el valor tiebreaker más alto. Se descartan las mutaciones con valores de tiebreaker más bajos. |
value |
Es el nuevo valor que establece la mutación. A menos que se configure la opción de canalización stripValues , el valor se establece para las mutaciones SET_CELL . Para otros tipos de mutación, el valor no está configurado. Llega como un array de bytes. Cuando se configura la codificación de mensajes JSON, los valores se muestran como strings.
Cuando se especifica useBase64Values , el valor está codificado en Base64. De lo contrario, se usa un charset especificado por bigtableChangeStreamCharset para decodificar los bytes de valor en una string. |
sourceInstance |
El nombre de la instancia de Bigtable que registró la mutación. Puede ser cuando varias canalizaciones transmiten cambios de instancias diferentes al mismo tema de Pub/Sub. |
sourceCluster |
El nombre del clúster de Bigtable que registró la mutación. Se puede usar cuando varias canalizaciones transmiten cambios de instancias diferentes al mismo tema de Pub/Sub. |
sourceTable |
El nombre de la tabla de Bigtable que recibió la mutación. Se puede usar en caso de que varias canalizaciones transmitan cambios de una tabla diferente al mismo tema de Pub/Sub. |
Requisitos de la canalización
- La instancia de origen de Bigtable especificada.
- La tabla de origen de Bigtable especificada. La tabla debe tener las transmisiones de cambios habilitadas.
- El perfil de aplicación de Bigtable especificado.
- El tema de Pub/Sub especificado debe existir.
Parámetros de la plantilla
Parámetros obligatorios
- pubSubTopic: Es el nombre del tema de Pub/Sub de destino.
- bigtableChangeStreamAppProfile: El ID de perfil de la aplicación de Bigtable. El perfil de aplicación debe usar el enrutamiento de un solo clúster y permitir las transacciones de fila única.
- bigtableReadInstanceId: El ID de la instancia de Bigtable de origen.
- bigtableReadTableId: El ID de la tabla de Bigtable de origen.
Parámetros opcionales
- messageEncoding: Es la codificación de los mensajes que se publicarán en el tema de Pub/Sub. Cuando se configura el esquema del tema de destino, la codificación del mensaje se determina mediante la configuración del tema. Se admiten los siguientes valores:
BINARY
yJSON
. La configuración predeterminada esJSON
. - messageFormat: Es la codificación de los mensajes que se publicarán en el tema de Pub/Sub. Cuando se configura el esquema del tema de destino, la codificación del mensaje se determina mediante la configuración del tema. Se admiten los siguientes valores:
AVRO
,PROTOCOL_BUFFERS
yJSON
. El valor predeterminado esJSON
. Cuando se usa el formatoJSON
, los campos rowKey, columna y valor del mensaje son cadenas, cuyo contenido se determina mediante las opciones de canalizaciónuseBase64Rowkeys
,useBase64ColumnQualifiers
,useBase64Values
ybigtableChangeStreamCharset
- stripValues: Cuando se establece en
true
, las mutacionesSET_CELL
se muestran sin valores nuevos establecidos. La configuración predeterminada esfalse
. Este parámetro es útil cuando no necesitas que haya un valor nuevo, también conocido como invalidación de caché, o cuando los valores sean extremadamente grandes y excedan los límites de tamaño de los mensajes de Pub/Sub. - dlqDirectory: Es el directorio de la cola de mensajes no entregados. Los registros que no se pueden procesar se almacenan en este directorio. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. En la mayoría de los casos, puedes usar la ruta de acceso predeterminada.
- dlqRetryMinutes: La cantidad de minutos entre reintentos de la cola de mensajes no entregados. La configuración predeterminada es
10
. - dlqMaxRetries: Es el máximo de reintentos de mensajes no entregados. La configuración predeterminada es
5
. - useBase64Rowkeys: Se usa con la codificación de mensajes JSON. Cuando se establece en
true
, el camporowKey
es una cadena codificada en base64. De lo contrario, larowKey
se produce mediante el uso debigtableChangeStreamCharset
para decodificar bytes en una cadena. La configuración predeterminada esfalse
. - pubSubProjectId: El ID del proyecto de Bigtable. El valor predeterminado es el proyecto del trabajo de Dataflow.
- useBase64ColumnQualifiers: Se usa con la codificación de mensajes JSON. Cuando se establece en
true
, el campocolumn
es una cadena codificada en base64. De lo contrario, la columna se produce mediantebigtableChangeStreamCharset
para decodificar bytes en una cadena. El valor predeterminado esfalse
. - useBase64Values: Se usa con la codificación de mensajes JSON. Cuando se establece en
true
, el campo valor es una cadena codificada en base64. De lo contrario, el valor se produce mediantebigtableChangeStreamCharset
para decodificar bytes en una cadena. La configuración predeterminada esfalse
. - disableDlqRetries: Indica si se deben inhabilitar o no los reintentos para la DLQ. La configuración predeterminada es "false".
- bigtableChangeStreamMetadataInstanceId: El ID de la instancia de metadatos de flujos de cambios de Bigtable. La configuración predeterminada es vacía.
- bigtableChangeStreamMetadataTableTableId: El ID de la tabla de metadatos del conector de flujos de cambios de Bigtable. Si no se proporciona, una tabla de metadatos de conectores de transmisión de cambios de Bigtable se crea de forma automática durante la ejecución de la canalización. La configuración predeterminada es vacía.
- bigtableChangeStreamCharset: El nombre del conjunto de caracteres de los flujos de cambios de Bigtable. La configuración predeterminada es UTF-8.
- bigtableChangeStreamStartTimestamp: La marca de tiempo de inicio (https://tools.ietf.org/html/rfc3339), inclusiva, que se usará para leer los flujos de cambios. Por ejemplo,
2022-05-05T07:59:59Z
. El valor predeterminado es la marca de tiempo de la hora de inicio de la canalización. - bigtableChangeStreamIgnoreColumnFamilies: Una lista separada por comas de los cambios en el nombre de la familia de columnas que se deben ignorar (opcional). La configuración predeterminada es vacía.
- bigtableChangeStreamIgnoreColumns: Una lista separada por comas de los cambios en el nombre de la columna que se deben ignorar. La configuración predeterminada es vacía.
- bigtableChangeStreamName: Un nombre único para la canalización del cliente. Te permite reanudar el procesamiento desde el momento en que se detuvo una canalización que estaba en ejecución. El valor predeterminado es un nombre generado automáticamente. Consulta los registros de trabajos de Dataflow para el valor usado.
- bigtableChangeStreamResume: Cuando se establece en
true
, una canalización nueva reanuda el procesamiento desde el momento en que se detuvo una canalización que se ejecutaba antes con el mismo valorbigtableChangeStreamName
. Si nunca se ejecuta la canalización con el valorbigtableChangeStreamName
determinado, no se inicia una canalización nueva. Cuando se establece enfalse
, se inicia una canalización nueva. Si ya se ejecutó una canalización con el mismo valorbigtableChangeStreamName
para una fuente determinada, no se inicia una canalización nueva. La configuración predeterminada esfalse
. - bigtableReadProjectId: El ID del proyecto de Bigtable. El valor predeterminado es el proyecto para el trabajo de Dataflow.
Ejecuta la plantilla
Console
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable change streams to Pub/Sub template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
REGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
BIGTABLE_INSTANCE_ID
: tu ID de instancia de Bigtable.BIGTABLE_TABLE_ID
: tu ID de tabla de Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: Es el ID de perfil de la aplicación de Bigtable.PUBSUB_TOPIC
: Es el nombre del tema de destino de Pub/Sub.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
LOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
BIGTABLE_INSTANCE_ID
: tu ID de instancia de Bigtable.BIGTABLE_TABLE_ID
: tu ID de tabla de Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: Es el ID de perfil de la aplicación de Bigtable.PUBSUB_TOPIC
: Es el nombre del tema de destino de Pub/Sub.
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.