Plantilla de flujos de cambios de Bigtable a Pub/Sub

La plantilla de flujos de cambios de Bigtable a Pub/Sub es una canalización de transmisión que transmite los registros de cambios de datos de Bigtable y los publica en un tema de Pub/Sub con Dataflow.

Un flujo de cambios de Bigtable te permite suscribirte a las mutaciones de los datos por tabla. Cuando te suscribes a transmisiones de cambios de tablas, se aplican las siguientes restricciones:

  • Solo se muestran las celdas y los descriptores modificados de las operaciones de eliminación.
  • Solo se muestra el valor nuevo de una celda modificada.

Cuando los registros de cambios de datos se publican en un tema de Pub/Sub, los mensajes se pueden insertar de forma desordenada en comparación con el orden original de las marcas de tiempo de confirmación de Bigtable.

Los registros de cambios de datos de Bigtable que no se pueden publicar en temas de Pub/Sub se colocan temporalmente en un directorio de cola de mensajes no entregados (cola de mensajes no procesados) en Cloud Storage. Después de la cantidad máxima de reintentos fallidos, estos registros se colocan de forma indefinida en el mismo directorio de cola de mensajes no entregados para que el usuario los revise o los procese.

La canalización requiere que el tema de Pub/Sub de destino exista. El tema de destino puede configurarse para validar mensajes con un esquema. Cuando un tema de Pub/Sub especifica un esquema, la canalización solo se inicia si el esquema es válido. Según el tipo de esquema, usa una de las siguientes definiciones de esquema para el tema de destino:

Protocol Buffers

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

Usa el siguiente esquema de Protobuf con codificación de mensajes JSON:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Cada mensaje de Pub/Sub nuevo incluye un registro de cambios de datos que el flujo de cambios muestra a partir de su fila correspondiente en tu tabla de Bigtable.

Descripción del mensaje de salida de Pub/Sub

Nombre del campo Descripción
rowKey La clave de la fila modificada. Llega como un array de bytes. Cuando se configura la codificación de mensajes JSON, las claves de fila se muestran como strings. Cuando se especifica useBase64Rowkeys, las claves de fila se codifican en Base64. De lo contrario, se usa un charset especificado por bigtableChangeStreamCharset para decodificar los bytes de clave de fila en una string.
modType El tipo de mutación de la fila. Usa uno de los siguientes valores: SET_CELL, DELETE_CELLS o DELETE_FAMILY.
columnFamily La familia de columnas afectada por la mutación de la fila.
column El calificador de columna afectado por la mutación de la fila. Para el tipo de mutación DELETE_FAMILY, no se establece el campo de la columna. Llega como un array de bytes. Cuando se configura la codificación de mensajes JSON, las columnas se muestran como strings. Cuando se especifica useBase64ColumnQualifier, el campo de la columna está codificado en Base64. De lo contrario, se usa un charset especificado por bigtableChangeStreamCharset para decodificar los bytes de clave de fila en una string.
commitTimestamp El momento en que Bigtable aplica la mutación. El tiempo se mide en microsegundos desde el tiempo Unix (1 de enero de 1970 en UTC).
timestamp El valor de la marca de tiempo de la celda afectada por la mutación. Para los tipos de mutación DELETE_CELLS y DELETE_FAMILY, no se configuró la marca de tiempo. El tiempo se mide en microsegundos desde el tiempo Unix (1 de enero de 1970 en UTC).
timestampFrom Describe un inicio inclusivo del intervalo de marcas de tiempo para todas las celdas que borró la mutación DELETE_CELLS. Para otros tipos de mutaciones, timestampFrom no está configurado. El tiempo se mide en microsegundos desde el tiempo Unix (1 de enero de 1970 en UTC).
timestampTo Describe un final exclusivo del intervalo de marca de tiempo para todas las celdas que borró la mutación DELETE_CELLS. Para otros tipos de mutaciones, timestampTo no está configurado.
isGC Un valor booleano que indica si la mutación se genera a través de un mecanismo de recolección de elementos no utilizados de Bigtable.
tieBreaker Cuando diferentes clústeres de Bigtable registran dos mutaciones al mismo tiempo, se aplica a la tabla de origen la mutación con el valor tiebreaker más alto. Se descartan las mutaciones con valores de tiebreaker más bajos.
value Es el nuevo valor que establece la mutación. A menos que se configure la opción de canalización stripValues, el valor se establece para las mutaciones SET_CELL. Para otros tipos de mutación, el valor no está configurado. Llega como un array de bytes. Cuando se configura la codificación de mensajes JSON, los valores se muestran como strings. Cuando se especifica useBase64Values, el valor está codificado en Base64. De lo contrario, se usa un charset especificado por bigtableChangeStreamCharset para decodificar los bytes de valor en una string.
sourceInstance El nombre de la instancia de Bigtable que registró la mutación. Puede ser cuando varias canalizaciones transmiten cambios de instancias diferentes al mismo tema de Pub/Sub.
sourceCluster El nombre del clúster de Bigtable que registró la mutación. Se puede usar cuando varias canalizaciones transmiten cambios de instancias diferentes al mismo tema de Pub/Sub.
sourceTable El nombre de la tabla de Bigtable que recibió la mutación. Se puede usar en caso de que varias canalizaciones transmitan cambios de una tabla diferente al mismo tema de Pub/Sub.

Requisitos de la canalización

  • La instancia de origen de Bigtable especificada.
  • La tabla de origen de Bigtable especificada. La tabla debe tener las transmisiones de cambios habilitadas.
  • El perfil de aplicación de Bigtable especificado.
  • El tema de Pub/Sub especificado debe existir.

Parámetros de la plantilla

Parámetro Descripción
bigtableReadInstanceId El ID de la instancia de Bigtable de origen.
bigtableReadTableId El ID de la tabla de origen de Bigtable.
bigtableChangeStreamAppProfile El ID de perfil de la aplicación de Bigtable. El perfil de aplicación debe usar el enrutamiento de un solo clúster y permitir las transacciones de fila única.
pubSubTopic El nombre del tema de Pub/Sub de destino.
messageFormat Opcional: Cuando el tema de destino tiene configurado un esquema, el formato del mensaje se determina mediante el esquema y la codificación configurados. El formato de los mensajes que se publicarán en el tema de Pub/Sub. Valores admitidos: AVRO, PROTOCOL_BUFFERS y JSON. La configuración predeterminada es JSON. Cuando se usa el formato JSON, los campos rowKey, columna y valor del mensaje son cadenas, cuyo contenido se determina mediante las opciones de canalización useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values y bigtableChangeStreamCharset.
messageEncoding Opcional: Cuando el tema de destino tiene configurado un esquema, la codificación del mensaje se determina mediante la configuración del tema. La codificación de los mensajes que se publicarán en el tema de Pub/Sub. Valores admitidos: BINARY, JSON y JSON. La configuración predeterminada es JSON.
stripValues Opcional: Cuando se establece en true, las mutaciones SET_CELL se muestran sin valores nuevos establecidos. La configuración predeterminada es "false". Este parámetro es útil cuando no necesitas que haya un valor nuevo, también conocido como invalidación de caché, o cuando los valores sean extremadamente grandes y excedan los límites de tamaño de los mensajes de Pub/Sub.
bigtableReadProjectId Opcional: El ID del proyecto de Bigtable. El valor predeterminado es el proyecto del trabajo de Dataflow.
pubSubProjectId Opcional: El ID del proyecto de Bigtable. El valor predeterminado es el proyecto del trabajo de Dataflow.
bigtableChangeStreamMetadataInstanceId Opcional: El ID de la instancia de metadatos de transmisión de cambios de Bigtable.
bigtableChangeStreamMetadataTableTableId Opcional: El ID de la tabla de metadatos de flujos de cambios de Bigtable.
bigtableChangeStreamCharset Opcional: El cambio de Bigtable transmite el nombre del charset cuando se leen claves de filas, valores y calificadores de columnas. Esta opción se usa cuando la codificación de mensajes es JSON.
bigtableChangeStreamStartTimestamp Opcional: La marca de tiempo de inicio, inclusiva, que se usará para leer los flujos de cambios. Por ejemplo, 2022-05-05T07:59:59Z El valor predeterminado es la marca de tiempo de la hora de inicio de la canalización.
bigtableChangeStreamIgnoreColumnFamilies Una lista separada por comas de los cambios en el nombre de la familia de columnas que se deben ignorar (opcional).
bigtableChangeStreamIgnoreColumns Una lista separada por comas de los cambios en el nombre de la columna (opcional).
bigtableChangeStreamName Un nombre único para la canalización del cliente (opcional). Te permite reanudar el procesamiento desde el momento en que se detuvo una canalización que estaba en ejecución. El valor predeterminado es el nombre generado automáticamente. Para encontrar el valor, consulta los registros del trabajo de Dataflow.
bigtableChangeStreamResume Opcional: Cuando se establece en true, una canalización nueva reanuda el procesamiento desde el momento en que se detiene una canalización que se ejecutaba antes con el mismo valor bigtableChangeStreamName. Si nunca se ejecuta la canalización con el valor bigtableChangeStreamName determinado, no se inicia una canalización nueva. Cuando se establece en false, se inicia una canalización nueva. Si ya se ejecutó una canalización con el mismo valor bigtableChangeStreamName para una fuente determinada, no se inicia una canalización nueva. La configuración predeterminada es false.
useBase64Rowkeys Opcional: Se usa con la codificación de mensajes JSON. Cuando se establece en true, el campo rowKey es una string codificada en base64. De lo contrario, la rowKey se produce mediante el uso de bigtableChangeStreamCharset para decodificar bytes en una string. El valor predeterminado es false.
useBase64ColumnQualifiers Opcional: Se usa con la codificación de mensajes JSON. Cuando se establece en true, el campo column es una string codificada en base64. De lo contrario, la columna se produce mediante bigtableChangeStreamCharset para decodificar bytes en una string. La configuración predeterminada es false.
useBase64Values Opcional: Se usa con la codificación de mensajes JSON. Cuando se establece en true, el campo value es una string codificada en base64. De lo contrario, el valor se produce mediante bigtableChangeStreamCharset para decodificar bytes en una string. La configuración predeterminada es false.
dlqMaxRetries Opcional: El máximo de reintentos de mensajes no entregados. La configuración predeterminada es 5.
dlqRetryMinutes Opcional: La cantidad de minutos entre reintentos de la cola de mensajes no entregados. La configuración predeterminada es 10.
dlqDirectory Opcional: El directorio para la cola de mensajes no entregados. Los registros que no se pueden procesar se almacenan en este directorio. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. En la mayoría de los casos, puedes usar la ruta de acceso predeterminada.

Ejecuta la plantilla

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable change streams to Pub/Sub template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_INSTANCE_ID: tu ID de instancia de Bigtable.
  • BIGTABLE_TABLE_ID: tu ID de tabla de Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: Es el ID de perfil de la aplicación de Bigtable.
  • PUBSUB_TOPIC: Es el nombre del tema de destino de Pub/Sub.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_INSTANCE_ID: tu ID de instancia de Bigtable.
  • BIGTABLE_TABLE_ID: tu ID de tabla de Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: Es el ID de perfil de la aplicación de Bigtable.
  • PUBSUB_TOPIC: Es el nombre del tema de destino de Pub/Sub.

¿Qué sigue?