Plantilla de flujos de cambios de Bigtable a BigQuery

La plantilla de flujos de cambios de Bigtable a BigQuery es una canalización de transmisión que transmite los registros de cambios de datos de Bigtable y los escribe en tablas de BigQuery a través de Dataflow.

Un flujo de cambios de Bigtable te permite suscribirte a las mutaciones de los datos por tabla. Cuando te suscribes a transmisiones de cambios de tablas, se aplican las siguientes restricciones:

  • Solo se muestran las celdas y los descriptores modificados de las operaciones de eliminación.
  • Solo se muestra el valor nuevo de una celda modificada.

Cuando los registros de cambios de datos se escriben en BigQuery, las filas pueden insertarse de forma desordenada en comparación con el orden original de las marcas de tiempo de confirmación de Bigtable.

Las filas de la tabla de registro de cambios que no se pueden escribir en BigQuery debido a un error persistente se colocan de forma permanente en un directorio de cola de mensajes no entregados (cola de mensajes no procesados) en Cloud Storage para su revisión manual o un procesamiento posterior por parte del usuario.

Si la tabla de BigQuery necesaria no existe, la canalización la crea. De lo contrario, se usa una tabla de BigQuery existente. El esquema de las tablas de BigQuery existentes debe contener las columnas de la siguiente tabla.

Cada fila nueva de BigQuery incluye un registro de cambios de datos que muestra la transmisión de cambios de su fila correspondiente en tu tabla de Bigtable.

Esquema de la tabla de salida de BigQuery

Nombre de la columna Tipo Anulable Descripción
row_key STRING o BYTES No La clave de fila de la fila modificada. Cuando la opción de canalización writeRowkeyAsBytes se establece en true, el tipo de columna debe ser BYTES. De lo contrario, usa el tipo STRING.
mod_type STRING No El tipo de mutación de la fila. Usa uno de los siguientes valores: SET_CELL, DELETE_CELLS o DELETE_FAMILY.
column_family STRING No La familia de columnas afectada por la mutación de la fila.
column STRING El calificador de columna afectado por la mutación de la fila. Para el tipo de mutación DELETE_FAMILY, configúralo como NULL.
commit_timestamp TIMESTAMP No El momento en que Bigtable aplica la mutación.
big_query_commit_timestamp TIMESTAMP Especifica el tiempo en que BigQuery escribe la fila en una tabla de salida (opcional). El campo no se propaga si el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
timestamp TIMESTAMP o INT64 El valor de la marca de tiempo de la celda afectada por la mutación. Cuando la opción de canalización writeNumericTimestamps se establece en true, el tipo de columna debe ser INT64. De lo contrario, usa el tipo TIMESTAMP. Para los tipos de mutaciones DELETE_CELLS y DELETE_FAMILY, se configuran en NULL.
timestamp_from TIMESTAMP o INT64 Describe un inicio inclusivo del intervalo de marcas de tiempo para todas las celdas que borró la mutación DELETE_CELLS. Para otros tipos de mutaciones, configúralo como NULL.
timestamp_to TIMESTAMP o INT64 Describe un final exclusivo del intervalo de marca de tiempo para todas las celdas que borró la mutación DELETE_CELLS. Para otros tipos de mutaciones, configúralo como NULL.
is_gc BOOL No Opcional: Cuando la mutación se activa mediante una política de recolección de elementos no utilizados, configúralo como true. En todos los demás casos, configúralo como false. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
source_instance STRING No Opcional: Describe el nombre de la instancia de Bigtable de la que proviene la mutación. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
source_cluster STRING No Opcional: Describe el nombre del clúster de Bigtable del que proviene la mutación. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
source_table STRING No Opcional: Describe el nombre de la tabla de Bigtable a la que se aplica la mutación. El valor en esta columna puede ser útil si varias tablas de Bigtable transmiten cambios a la misma tabla de BigQuery. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
tiebreaker INT64 No Opcional: Cuando dos clústeres de Bigtable registran dos mutaciones al mismo tiempo, se aplica a la tabla de origen la mutación con el valor tiebreaker más alto. Se descartan las mutaciones con valores de tiebreaker más bajos. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
value STRING o BYTES Es el nuevo valor que establece la mutación. Cuando la opción de canalización writeValuesAsBytes se establece en true, el tipo de columna debe ser BYTES. De lo contrario, usa el tipo STRING. El valor se establece para las mutaciones SET_CELL. Para otros tipos de mutaciones, el valor se establece en NULL.

Requisitos de la canalización

  • La instancia de origen de Bigtable especificada.
  • La tabla de origen de Bigtable especificada. La tabla debe tener las transmisiones de cambios habilitadas.
  • El perfil de aplicación de Bigtable especificado.
  • El conjunto de datos de destino de BigQuery especificado.

Parámetros de la plantilla

Parámetros obligatorios

  • bigQueryDataset: Es el nombre del conjunto de datos de la tabla de BigQuery de destino.
  • bigtableChangeStreamAppProfile: El ID de perfil de la aplicación de Bigtable. El perfil de aplicación debe usar el enrutamiento de un solo clúster y permitir las transacciones de fila única.
  • bigtableReadInstanceId: El ID de la instancia de Bigtable de origen.
  • bigtableReadTableId: El ID de la tabla de Bigtable de origen.

Parámetros opcionales

  • writeRowkeyAsBytes: Indica si se deben escribir claves de fila como BYTES de BigQuery. Cuando se establece en true, las claves de fila se escriben en la columna BYTES. De lo contrario, las claves de fila se escriben en la columna STRING. La configuración predeterminada es false.
  • writeValuesAsBytes: Cuando se establece en true, los valores se escriben en una columna de tipo BYTES, de lo contrario, en una columna de tipo STRING . La configuración predeterminada es: false.
  • writeNumericTimestamps: Indica si se debe escribir la marca de tiempo de Bigtable como INT64 de BigQuery. Cuando se establece en true, los valores se escriben en la columna INT64. De lo contrario, los valores se escriben en la columna TIMESTAMP. Columnas afectadas: timestamp, timestamp_from y timestamp_to. La configuración predeterminada es false. Cuando se establece en true, la hora se mide en microsegundos desde el tiempo Unix (1 de enero de 1970 en UTC).
  • bigQueryProjectId: Es el ID del proyecto del conjunto de datos de BigQuery. El valor predeterminado es el proyecto para el trabajo de Dataflow.
  • bigQueryChangelogTableName: Es el nombre de la tabla de BigQuery de destino. Si no se especifica, se usa el valor bigtableReadTableId + "_changelog". La configuración predeterminada es vacía.
  • bigQueryChangelogTablePartitionGranularity: Especifica un nivel de detalle para particionar la tabla de registro de cambios. Cuando se configura, la tabla está particionada. Usa uno de los siguientes valores admitidos: HOUR, DAY, MONTH o YEAR. De forma predeterminada, la tabla no está particionada.
  • bigQueryChangelogTablePartitionExpirationMs: Configura el tiempo de vencimiento de la partición de la tabla de registros de cambios en milisegundos. Cuando se establece en true, se borran las particiones que son más antiguas que la cantidad especificada de milisegundos. De forma predeterminada, no se establece ningún vencimiento.
  • bigQueryChangelogTableFieldsToIgnore: Una lista separada por comas de las columnas de registro de cambios que no se crean ni se propagan (opcional). Usa uno de los siguientes valores admitidos: is_gc, source_instance, source_cluster, source_table, tiebreaker o big_query_commit_timestamp. De forma predeterminada, se propagan todas las columnas.
  • dlqDirectory: Es el directorio que se usará para la cola de mensajes no entregados. Los registros que no se pueden procesar se almacenan en este directorio. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. En la mayoría de los casos, puedes usar la ruta de acceso predeterminada.
  • bigtableChangeStreamMetadataInstanceId: El ID de la instancia de metadatos de flujos de cambios de Bigtable. La configuración predeterminada es vacía.
  • bigtableChangeStreamMetadataTableTableId: El ID de la tabla de metadatos del conector de flujos de cambios de Bigtable. Si no se proporciona, una tabla de metadatos de conectores de transmisión de cambios de Bigtable se crea de forma automática durante la ejecución de la canalización. La configuración predeterminada es vacía.
  • bigtableChangeStreamCharset: El nombre del conjunto de caracteres de los flujos de cambios de Bigtable. La configuración predeterminada es UTF-8.
  • bigtableChangeStreamStartTimestamp: La marca de tiempo de inicio (https://tools.ietf.org/html/rfc3339), inclusiva, que se usará para leer los flujos de cambios. Por ejemplo, 2022-05-05T07:59:59Z. El valor predeterminado es la marca de tiempo de la hora de inicio de la canalización.
  • bigtableChangeStreamIgnoreColumnFamilies: Una lista separada por comas de los cambios en el nombre de la familia de columnas que se deben ignorar (opcional). La configuración predeterminada es vacía.
  • bigtableChangeStreamIgnoreColumns: Una lista separada por comas de los cambios en el nombre de la columna que se deben ignorar. La configuración predeterminada es vacía.
  • bigtableChangeStreamName: Un nombre único para la canalización del cliente. Te permite reanudar el procesamiento desde el momento en que se detuvo una canalización que estaba en ejecución. El valor predeterminado es un nombre generado automáticamente. Consulta los registros de trabajos de Dataflow para el valor usado.
  • bigtableChangeStreamResume: Cuando se establece en true, una canalización nueva reanuda el procesamiento desde el momento en que se detuvo una canalización que se ejecutaba antes con el mismo valor bigtableChangeStreamName. Si nunca se ejecuta la canalización con el valor bigtableChangeStreamName determinado, no se inicia una canalización nueva. Cuando se establece en false, se inicia una canalización nueva. Si ya se ejecutó una canalización con el mismo valor bigtableChangeStreamName para una fuente determinada, no se inicia una canalización nueva. La configuración predeterminada es false.
  • bigtableReadProjectId: El ID del proyecto de Bigtable. El valor predeterminado es el proyecto para el trabajo de Dataflow.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable change streams to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_INSTANCE_ID: tu ID de instancia de Bigtable.
  • BIGTABLE_TABLE_ID: tu ID de tabla de Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: Es el ID de perfil de la aplicación de Bigtable.
  • BIGQUERY_DESTINATION_DATASET: Es el nombre del conjunto de datos de destino de BigQuery.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_INSTANCE_ID: tu ID de instancia de Bigtable.
  • BIGTABLE_TABLE_ID: tu ID de tabla de Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: Es el ID de perfil de la aplicación de Bigtable.
  • BIGQUERY_DESTINATION_DATASET: Es el nombre del conjunto de datos de destino de BigQuery.

¿Qué sigue?