Plantilla de flujos de cambios de Bigtable a BigQuery

La plantilla de flujos de cambios de Bigtable a BigQuery es una canalización de transmisión que transmite los registros de cambios de datos de Bigtable y los escribe en tablas de BigQuery a través de Dataflow.

Un flujo de cambios de Bigtable te permite suscribirte a las mutaciones de los datos por tabla. Cuando te suscribes a transmisiones de cambios de tablas, se aplican las siguientes restricciones:

  • Solo se muestran las celdas y los descriptores modificados de las operaciones de eliminación.
  • Solo se muestra el valor nuevo de una celda modificada.

Cuando los registros de cambios de datos se escriben en BigQuery, las filas pueden insertarse de forma desordenada en comparación con el orden original de las marcas de tiempo de confirmación de Bigtable.

Las filas de la tabla de registro de cambios que no se pueden escribir en BigQuery debido a un error persistente se colocan de forma permanente en un directorio de cola de mensajes no entregados (cola de mensajes no procesados) en Cloud Storage para su revisión manual o un procesamiento posterior por parte del usuario.

Si la tabla de BigQuery necesaria no existe, la canalización la crea. De lo contrario, se usa una tabla de BigQuery existente. El esquema de las tablas de BigQuery existentes debe contener las columnas de la siguiente tabla.

Cada fila nueva de BigQuery incluye un registro de cambios de datos que muestra la transmisión de cambios de su fila correspondiente en tu tabla de Bigtable.

Esquema de la tabla de salida de BigQuery

Nombre de la columna Tipo Anulable Descripción
row_key STRING o BYTES No. La clave de fila de la fila modificada. Cuando la opción de canalización writeRowkeyAsBytes se establece en true, el tipo de columna debe ser BYTES. De lo contrario, usa el tipo STRING.
mod_type STRING No. El tipo de mutación de la fila. Usa uno de los siguientes valores: SET_CELL, DELETE_CELLS o DELETE_FAMILY.
column_family STRING No. La familia de columnas afectada por la mutación de la fila.
column STRING El calificador de columna afectado por la mutación de la fila. Para el tipo de mutación DELETE_FAMILY, configúralo como NULL.
commit_timestamp TIMESTAMP No. El momento en que Bigtable aplica la mutación.
big_query_commit_timestamp TIMESTAMP Especifica el tiempo en que BigQuery escribe la fila en una tabla de salida (opcional). El campo no se propaga si el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
timestamp TIMESTAMP o INT64 El valor de la marca de tiempo de la celda afectada por la mutación. Cuando la opción de canalización writeNumericTimestamps se establece en true, el tipo de columna debe ser INT64. De lo contrario, usa el tipo TIMESTAMP. Para los tipos de mutaciones DELETE_CELLS y DELETE_FAMILY, se configuran en NULL.
timestamp_from TIMESTAMP o INT64 Describe un inicio inclusivo del intervalo de marcas de tiempo para todas las celdas que borró la mutación DELETE_CELLS. Para otros tipos de mutaciones, configúralo como NULL.
timestamp_to TIMESTAMP o INT64 Describe un final exclusivo del intervalo de marca de tiempo para todas las celdas que borró la mutación DELETE_CELLS. Para otros tipos de mutaciones, configúralo como NULL.
is_gc BOOL No. Opcional: Cuando la mutación se activa mediante una política de recolección de elementos no utilizados, configúralo como true. En todos los demás casos, configúralo como false. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
source_instance STRING No. Opcional: Describe el nombre de la instancia de Bigtable de la que proviene la mutación. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
source_cluster STRING No. Opcional: Describe el nombre del clúster de Bigtable del que proviene la mutación. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
source_table STRING No. Opcional: Describe el nombre de la tabla de Bigtable a la que se aplica la mutación. El valor en esta columna puede ser útil si varias tablas de Bigtable transmiten cambios a la misma tabla de BigQuery. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
tiebreaker INT64 No. Opcional: Cuando dos clústeres de Bigtable registran dos mutaciones al mismo tiempo, se aplica a la tabla de origen la mutación con el valor tiebreaker más alto. Se descartan las mutaciones con valores de tiebreaker más bajos. El campo no se propaga cuando el nombre de la columna está presente en el valor de la opción de canalización bigQueryChangelogTableFieldsToIgnore.
value STRING o BYTES Es el nuevo valor que establece la mutación. Cuando la opción de canalización writeValuesAsBytes se establece en true, el tipo de columna debe ser BYTES. De lo contrario, usa el tipo STRING. El valor se establece para las mutaciones SET_CELL. Para otros tipos de mutaciones, el valor se establece en NULL.

Requisitos de la canalización

  • La instancia de origen de Bigtable especificada.
  • La tabla de origen de Bigtable especificada. La tabla debe tener las transmisiones de cambios habilitadas.
  • El perfil de aplicación de Bigtable especificado.
  • El conjunto de datos de destino de BigQuery especificado.

Parámetros de la plantilla

Parámetro Descripción
bigtableReadInstanceId El ID de la instancia de Bigtable de origen.
bigtableReadTableId El ID de la tabla de origen de Bigtable.
bigtableChangeStreamAppProfile El ID de perfil de la aplicación de Bigtable. El perfil de aplicación debe usar el enrutamiento de un solo clúster y permitir las transacciones de fila única.
bigQueryDataset El nombre del conjunto de datos de la tabla de BigQuery de destino.
writeNumericTimestamps Opcional: Escribe la marca de tiempo de Bigtable como INT64 de BigQuery. Cuando se establece en true, los valores se escriben en la columna INT64. De lo contrario, los valores se escriben en la columna TIMESTAMP. Columnas afectadas: timestamp, timestamp_from y timestamp_to. La configuración predeterminada es false. Cuando se establece en true, la hora se mide en microsegundos desde la época Unix (1 de enero de 1970 en UTC).
writeRowkeyAsBytes Opcional: Escribe claves de fila como BigQuery BYTES. Cuando se establece en true, las claves de fila se escriben en la columna BYTES. De lo contrario, las claves de fila se escriben en la columna STRING. El valor predeterminado es false.
writeValuesAsBytes Opcional: Escribe valores como BYTES de BigQuery. Cuando se establece en true, los valores se escriben en la columna BYTES. De lo contrario, los valores se escriben en la columna STRING. El valor predeterminado es false.
bigQueryChangelogTableName Opcional: Nombre de la tabla de destino de BigQuery. Si no se especifica, se usa el valor bigtableReadTableId + "_changelog".
bigQueryProjectId Opcional: El ID del proyecto del conjunto de datos de BigQuery. El valor predeterminado es el proyecto para el trabajo de Dataflow.
bigtableReadProjectId Opcional: El ID del proyecto de Bigtable. El valor predeterminado es el proyecto para el trabajo de Dataflow.
bigtableChangeStreamMetadataInstanceId Opcional: El ID de la instancia de metadatos de transmisión de cambios de Bigtable.
bigtableChangeStreamMetadataTableTableId Opcional: El ID de la tabla de metadatos de flujos de cambios de Bigtable.
bigtableChangeStreamCharset Opcional: El cambio de Bigtable transmite el nombre del charset cuando se leen valores y calificadores de columnas.
bigtableChangeStreamStartTimestamp Opcional: La marca de tiempo de inicio, inclusiva, que se usará para leer los flujos de cambios. Por ejemplo, 2022-05-05T07:59:59Z El valor predeterminado es la marca de tiempo de la hora de inicio de la canalización.
bigtableChangeStreamIgnoreColumnFamilies Una lista separada por comas de los cambios en el nombre de la familia de columnas que se deben ignorar (opcional).
bigtableChangeStreamIgnoreColumns Una lista separada por comas de los cambios en el nombre de la columna (opcional).
bigtableChangeStreamName Un nombre único para la canalización del cliente (opcional). Te permite reanudar el procesamiento desde el momento en que se detuvo una canalización que estaba en ejecución. La configuración predeterminada es el nombre generado automáticamente. Consulta los registros de trabajos de Dataflow para el valor usado.
bigtableChangeStreamResume Opcional: Cuando se establece en true, una canalización nueva reanuda el procesamiento desde el momento en que se detiene una canalización que se ejecutaba antes con el mismo valor bigtableChangeStreamName. Si nunca se ejecuta la canalización con el valor bigtableChangeStreamName determinado, no se inicia una canalización nueva. Cuando se establece en false, se inicia una canalización nueva. Si ya se ejecutó una canalización con el mismo valor bigtableChangeStreamName para una fuente determinada, no se inicia una canalización nueva. La configuración predeterminada es false.
bigQueryChangelogTableFieldsToIgnore Una lista separada por comas de las columnas de registro de cambios que no se crean y se propagan (opcional). Usa uno de los siguientes valores admitidos: is_gc, source_instance, source_cluster, source_table, tiebreaker or big_query_commit_timestamp. De forma predeterminada, se propagan todas las columnas.
bigQueryChangelogTablePartitionExpirationMs Opcional: Configura el tiempo de vencimiento de la partición de la tabla de registros de cambios, en milisegundos. Cuando se establece en true, se borran las particiones que son más antiguas que la cantidad especificada de milisegundos. De forma predeterminada, no se establece ningún vencimiento.
bigQueryChangelogTablePartitionGranularity Opcional: Especifica un nivel de detalle para particionar la tabla de registro de cambios. Cuando se configura, la tabla está particionada. Usa uno de los siguientes valores admitidos: HOUR, DAY, MONTH o YEAR. De forma predeterminada, la tabla no está particionada.
dlqDirectory Opcional: El directorio para la cola de mensajes no entregados. Los registros que no se pueden procesar se almacenan en este directorio. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. En la mayoría de los casos, puedes usar la ruta de acceso predeterminada.

Ejecuta la plantilla

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable change streams to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_INSTANCE_ID: tu ID de instancia de Bigtable.
  • BIGTABLE_TABLE_ID: tu ID de tabla de Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: Es el ID de perfil de la aplicación de Bigtable.
  • BIGQUERY_DESTINATION_DATASET: Es el nombre del conjunto de datos de destino de BigQuery.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_INSTANCE_ID: tu ID de instancia de Bigtable.
  • BIGTABLE_TABLE_ID: tu ID de tabla de Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: Es el ID de perfil de la aplicación de Bigtable.
  • BIGQUERY_DESTINATION_DATASET: Es el nombre del conjunto de datos de destino de BigQuery.

¿Qué sigue?