En esta página, aprenderá a usar la API Datastream para hacer lo siguiente:
- Crear flujos
- Obtener información sobre las emisiones y los objetos de emisión
- Actualizar las secuencias iniciándolas, pausándolas, reanudándolas y modificándolas, así como iniciando y deteniendo el relleno de objetos de la secuencia
- Recuperar emisiones que han fallado permanentemente
- Habilitar la transmisión de objetos grandes para flujos de Oracle
- Eliminar flujos
Hay dos formas de usar la API DataStream. Puedes hacer llamadas a la API REST o usar Google Cloud CLI.
Para obtener información general sobre cómo usar Google Cloud CLI
para gestionar flujos de Datastream, consulta Flujos de Datastream de gcloud CLI.
Crear un flujo
En esta sección, aprenderá a crear un flujo que se usa para transferir datos de una fuente a un destino. Los ejemplos que se muestran a continuación no son exhaustivos, sino que destacan funciones específicas de Datastream. Para abordar tu caso práctico específico, usa estos ejemplos junto con la documentación de referencia de la API Datastream.
En esta sección se tratan los siguientes casos prácticos:
- Transmitir datos de Oracle a Cloud Storage
- Enviar datos de MySQL a BigQuery
- Enviar datos de PostgreSQL a BigQuery
- Definir un conjunto de objetos que se incluirán en el flujo
- Rellenar todos los objetos incluidos en el flujo
- Excluir objetos del vídeo
- Excluir objetos del relleno
- Definir CMEK para cifrar datos en reposo
- Definir el modo de escritura de un flujo
- Enviar datos de streaming a otro proyecto de BigQuery
- Emitir datos a tablas gestionadas de BigLake
Ejemplo 1: Transmitir objetos específicos a BigQuery
En este ejemplo, aprenderás a hacer lo siguiente:
- Transmitir datos de MySQL a BigQuery
- Incluir un conjunto de objetos en el flujo
- Definir el modo de escritura de la secuencia como de solo anexión
- Rellenar todos los objetos incluidos en el flujo
A continuación, se muestra una solicitud para extraer todas las tablas de schema1
y dos tablas específicas de schema2
: tableA
y tableC
. Los eventos se escriben en un conjunto de datos de BigQuery.
La solicitud no incluye el parámetro customerManagedEncryptionKey
, por lo que se usa el sistema de gestión de claves interno de Google Cloud para cifrar los datos en lugar de CMEK.
El parámetro backfillAll
asociado a la reposición del historial (o la captura) se define como un diccionario vacío ({}
), lo que significa que DataStream repone el historial de datos de todas las tablas incluidas en el flujo.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una emisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 2: Excluir objetos específicos de una transmisión con una fuente de PostgreSQL
En este ejemplo, aprenderás a hacer lo siguiente:
- Transmitir datos de PostgreSQL a BigQuery
- Excluir objetos del vídeo
- Excluir objetos del relleno
El siguiente código muestra una solicitud para crear un flujo que se usa para transferir datos de una base de datos PostgreSQL de origen a BigQuery. Al crear un flujo a partir de una base de datos PostgreSQL de origen, debe especificar dos campos adicionales específicos de PostgreSQL en su solicitud:
replicationSlot
: un slot de replicación es un requisito previo para configurar una base de datos PostgreSQL para la replicación. Debe crear un espacio de replicación para cada flujo.publication
: una publicación es un grupo de tablas de las que quieres replicar los cambios. El nombre de la publicación debe existir en la base de datos antes de iniciar una emisión. Como mínimo, la publicación debe incluir las tablas especificadas en la listaincludeObjects
de la emisión.
El parámetro backfillAll
asociado a la realización del relleno histórico (o la instantánea) se ha definido para excluir una tabla.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una emisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 3: Especificar el modo de escritura de solo anexión para un flujo
Cuando envías datos en streaming a BigQuery, puedes definir el modo de escritura: merge
o appendOnly
. Para obtener más información, consulta Configurar el modo de escritura.
Si no especificas el modo de escritura en tu solicitud para crear un flujo, se usará el modo merge
predeterminado.
En la siguiente solicitud se muestra cómo definir el modo appendOnly
al crear un flujo de MySQL a BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una emisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 4: Transmitir a otro proyecto en BigQuery
Si has creado tus recursos de Datastream en un proyecto, pero quieres enviar datos a otro proyecto en BigQuery, puedes hacerlo con una solicitud similar a la siguiente.
Si especifica sourceHierarchyDatasets
en el conjunto de datos de destino, deberá rellenar el campo projectId
.
Si especifica singleTargetDataset
en el conjunto de datos de destino, rellene el campo datasetId
con el formato projectId:datasetId
.
REST
Para sourceHierarchyDatasets
:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream1 { "displayName": "My cross-project stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" }, "projectId": "myProjectId2" } } }, "backfillAll": {} }
Para singleTargetDataset
:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream2 { "displayName": "My cross-project stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "singleTargetDataset": { "datasetId": "myProjectId2:myDatasetId" }, } }, "backfillAll": {} }
gcloud
Para sourceHierarchyDatasets
:
datastream streams create crossProjectBqStream1 --location=us-central1 --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json --destination=destination-cp --bigquery-destination-config=source_hierarchy_cross_project_config.json --backfill-none
Contenido del archivo de configuración source_hierarchy_cross_project_config.json
:
{"sourceHierarchyDatasets": {"datasetTemplate": {"location": "us-central1", "datasetIdPrefix": "prefix_"}, "projectId": "myProjectId2"}}
Para singleTargetDataset
:
datastream streams create crossProjectBqStream --location=us-central1 --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json --destination=destination-cp --bigquery-destination-config=single_target_cross_project_config.json --backfill-none
Contenido del archivo de configuración single_target_cross_project_config.json
:
{"singleTargetDataset": {"datasetId": "myProjectId2:myDatastetId"}}
Para obtener más información sobre cómo usar gcloud
para crear una emisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 5: Transmitir a un destino de Cloud Storage
En este ejemplo, aprenderás a hacer lo siguiente:
- Transmitir datos de Oracle a Cloud Storage
- Definir un conjunto de objetos que se incluirán en el flujo
- Definir CMEK para cifrar datos en reposo
En la siguiente solicitud se muestra cómo crear un flujo que escriba los eventos en un segmento de Cloud Storage.
En esta solicitud de ejemplo, los eventos se escriben en formato de salida JSON y se crea un archivo nuevo cada 100 MB o 30 segundos (se sobrescriben los valores predeterminados de 50 MB y 60 segundos).
En el formato JSON, puedes hacer lo siguiente:
Incluye un archivo de esquema de tipos unificados en la ruta. Por lo tanto, Datastream escribe dos archivos en Cloud Storage: un archivo de datos JSON y un archivo de esquema Avro. El archivo de esquema tiene el mismo nombre que el archivo de datos, pero con la extensión
.schema
.Habilita la compresión con gzip para que Datastream comprima los archivos que se escriben en Cloud Storage.
Al usar el parámetro backfillNone
, la solicitud especifica que solo se transmiten los cambios en curso al destino, sin rellenar los datos anteriores.
La solicitud especifica el parámetro de clave de cifrado gestionada por el cliente, que te permite controlar las claves que se utilizan para cifrar los datos en reposo de un Google Cloud proyecto. El parámetro hace referencia a la CMEK que usa Datastream para cifrar los datos que se transmiten desde la fuente al destino. También especifica el conjunto de claves de tu CMEK.
Para obtener más información sobre los conjuntos de claves, consulta Recursos de Cloud KMS. Para obtener más información sobre cómo proteger tus datos con claves de encriptado, consulta Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una emisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 6: Transmitir datos a una tabla gestionada de BigLake
En este ejemplo, aprenderá a configurar un flujo para replicar datos de una base de datos MySQL en una tabla de Iceberg de BigLake en modo append-only
.
Antes de crear la solicitud, asegúrate de que has completado los siguientes pasos:
- Tener un segmento de Cloud Storage en el que quieras almacenar tus datos
- Crear una conexión de recurso en la nube
- Concede a tu conexión de recursos de Cloud acceso al segmento de Cloud Storage
A continuación, puedes usar la siguiente solicitud para crear tu emisión:
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream { "displayName": "MySQL to BigLake stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "my-mysql-database" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" , "bigqueryDestinationConfig": { "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": { "datasetId": "my-project-id:my-bigquery-dataset-id" }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
datastream streams create mysqlBigLakeStream --location=us-central1 --display-name=mysql-to-bl-stream --source=source--mysql-source-config=mysql_source_config.json --destination=destination --bigquery-destination-config=bl_config.json --backfill-none
Contenido del archivo de configuración de la fuente mysql_source_config.json
:
{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{ "database":"my-mysql-database"}]}}
Contenido del archivo de configuración bl_config.json
:
{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder","connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }
Terraform
resource "google_datastream_stream" "stream" { stream_id = "mysqlBlStream" location = "us-central1" display_name = "MySQL to BigLake stream" source_config { source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp" mysql_source_config { include_objects { mysql_databases { database = "my-mysql-database" } } } } destination_config { destination_connection_profile ="projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" bigquery_destination_config { single_target_dataset { dataset_id = "my-project-id:my-bigquery-dataset-id" } blmt_config { bucket = "my-gcs-bucket-name" table_format = "ICEBERG" file_format = "PARQUET" connection_name = "my-project-id.us-central1.my-bigquery-connection-name" root_path = "my/folder" } append_only {} } } backfill_none {} }
Validar la definición de un flujo
Antes de crear un flujo, puede validar su definición. De esta forma, te aseguras de que se superen todas las comprobaciones de validación y de que la emisión se ejecute correctamente cuando se cree.
Al validar un flujo, se comprueba lo siguiente:
- Si la fuente está configurada correctamente para permitir que Datastream transmita datos desde ella.
- Si el flujo puede conectarse tanto al origen como al destino.
- La configuración integral del flujo.
Para validar un flujo, añade &validate_only=true
a la URL que precede al cuerpo de tu solicitud:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Después de enviar esta solicitud, verá las comprobaciones de validación que Datastream realiza en su fuente y destino, así como si las comprobaciones se han superado o no. Si no se supera alguna comprobación de validación, se mostrará información sobre el motivo del error y sobre qué hacer para corregir el problema.
Por ejemplo, supongamos que tienes una clave de cifrado gestionada por el cliente (CMEK) que quieres que Datastream use para cifrar los datos que se transmiten de la fuente al destino. Como parte de la validación del flujo, Datastream verificará que la clave exista y que Datastream tenga permisos para usarla. Si no se cumple alguna de estas dos condiciones, al validar el flujo, se devolverá el siguiente mensaje de error:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Para solucionar este problema, compruebe que la clave que ha proporcionado existe y que la cuenta de servicio de Datastream tiene el permiso cloudkms.cryptoKeys.get
para la clave.
Después de hacer las correcciones pertinentes, vuelve a enviar la solicitud para asegurarte de que se superan todas las comprobaciones de validación. En el ejemplo anterior, la comprobación CMEK_VALIDATE_PERMISSIONS
ya no devolverá un mensaje de error, sino que tendrá el estado PASSED
.
Obtener información sobre una emisión
En el siguiente código se muestra una solicitud para obtener información sobre una emisión. Entre la información obtenida de esta forma, se incluyen los siguientes datos:
- Nombre del flujo (identificador único)
- Nombre descriptivo de la emisión (nombre visible)
- Marcas de tiempo de cuándo se creó y se actualizó por última vez la emisión
- Información sobre los perfiles de conexión de origen y destino asociados al flujo
- Estado del flujo
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
La respuesta aparece de la siguiente manera:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para recuperar información sobre tu emisión, consulta la documentación del SDK de Google Cloud.
Mostrar emisiones
El siguiente código muestra una solicitud para obtener una lista de todas las emisiones del proyecto y la ubicación especificados.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Para obtener más información sobre cómo usar gcloud
para recuperar información sobre todas tus emisiones, consulta la documentación del SDK de Google Cloud.
Mostrar los objetos de un flujo
En el siguiente código se muestra una solicitud para obtener información sobre todos los objetos de una secuencia.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Para obtener más información sobre cómo usar gcloud
para recuperar información sobre todos los objetos de tu flujo, consulta la documentación del SDK de Google Cloud.
La lista de objetos devuelta puede ser similar a la siguiente:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Para obtener más información sobre cómo usar gcloud
para enumerar los objetos de un flujo, consulta la documentación del SDK de Google Cloud.
Iniciar un flujo
En el siguiente código se muestra una solicitud para iniciar una emisión.
Si usas el parámetro updateMask
en la solicitud, solo se incluirán en el cuerpo de la solicitud los campos que especifiques. Para iniciar una emisión, cambia el valor del campo state
de CREATED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Para obtener más información sobre cómo usar gcloud
para iniciar tu emisión, consulta la documentación del SDK de Google Cloud.
Pausar un flujo
El siguiente código muestra una solicitud para pausar una emisión en curso.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo state
. Al pausar la emisión, cambias su estado de RUNNING
a PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Para obtener más información sobre cómo usar gcloud
para pausar tu emisión, consulta la documentación del SDK de Google Cloud.
Reanudar un flujo
El siguiente código muestra una solicitud para reanudar una emisión pausada.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo state
. Al reanudar la emisión, cambiarás su estado de PAUSED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Para obtener más información sobre cómo usar gcloud
para reanudar tu emisión, consulta la documentación del SDK de Google Cloud.
Recuperar un flujo
Puedes recuperar una emisión que ha fallado permanentemente con el método RunStream
. Cada tipo de base de datos de origen tiene su propia definición de las operaciones de recuperación de la secuencia que se pueden realizar. Para obtener más información, consulta Recuperar una emisión.
Recuperar un flujo de una fuente MySQL u Oracle
En los siguientes ejemplos de código se muestran solicitudes para recuperar un flujo de una fuente de MySQL u Oracle desde varias posiciones de archivo de registro:
REST
Recupera un flujo desde la posición actual. Esta es la opción predeterminada:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Recuperar una emisión desde la siguiente posición disponible:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Recuperar una emisión desde la posición más reciente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Recuperar un flujo desde una posición específica (replicación basada en binlog de MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Haz los cambios siguientes:
- NAME_OF_THE_LOG_FILE: El nombre del archivo de registro del que quieras recuperar tu emisión
- POSITION: la posición del archivo de registro desde la que quieras recuperar tu emisión. Si no proporciona el valor, Datastream recupera el flujo desde el principio del archivo.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Recuperar un flujo desde una posición específica (replicación basada en GTID de MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
Sustituye GTID_SET por uno o varios GTIDs únicos o intervalos de GTIDs de los que quieras recuperar tu emisión.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3" } } } }
Recuperar un flujo de una posición específica (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Para obtener más información sobre las opciones de recuperación disponibles, consulta Recuperar una emisión.
gcloud
No se puede recuperar una emisión con gcloud
.
Recuperar un flujo de una fuente de PostgreSQL
En el siguiente código de ejemplo se muestra una solicitud para recuperar un flujo de una fuente de PostgreSQL. Durante la recuperación, el flujo empieza a leer desde el primer número de secuencia de registro (LSN) de la ranura de replicación configurada para el flujo.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Si quieres cambiar el espacio de réplica, actualiza primero el flujo con el nuevo nombre del espacio de réplica:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
No se puede recuperar una emisión con gcloud
.
Recuperar una secuencia de una fuente de SQL Server
En los siguientes ejemplos de código se muestran solicitudes de ejemplo para recuperar una secuencia de una fuente de SQL Server.
REST
Recuperar una emisión desde la primera posición disponible:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Recupera un flujo a partir de un número de secuencia de registro preferido:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
No se puede recuperar una emisión con gcloud
.
Iniciar o reanudar una emisión desde una posición específica
Puedes iniciar una transmisión o reanudar una transmisión en pausa desde una posición específica en fuentes de MySQL y Oracle. Esto puede ser útil si quieres realizar un relleno con datos históricos con una herramienta externa o iniciar el CDC desde una posición que indiques. En el caso de una fuente MySQL, debe indicar una posición de registro binario o un conjunto de GTID. En el caso de una fuente Oracle, debe indicar un número de cambio del sistema (SCN) en el archivo de registro de rehacer.
En el siguiente código se muestra una solicitud para iniciar o reanudar una emisión ya creada desde una posición específica.
Iniciar o reanudar una secuencia desde una posición de binlog específica (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Haz los cambios siguientes:
- NAME_OF_THE_LOG_FILE: el nombre del archivo de registro desde el que quieras iniciar la emisión.
- POSITION: la posición del archivo de registro desde la que quieres iniciar la transmisión. Si no proporciona el valor, Datastream empezará a leer desde el principio del archivo.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
No se puede iniciar ni reanudar una emisión desde una posición específica con gcloud
. Para obtener información sobre cómo usar gcloud
para iniciar o reanudar una emisión, consulta la documentación del SDK de Google Cloud.
Iniciar o reanudar una réplica desde un conjunto de GTIDs específico (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
Sustituye GTID_SET por uno o varios GTIDs únicos o intervalos de GTIDs desde los que quieras iniciar o reanudar tu emisión.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7" } } } }
gcloud
No se puede iniciar ni reanudar una emisión desde una posición específica con gcloud
. Para obtener información sobre cómo usar gcloud
para iniciar o reanudar una emisión, consulta la documentación del SDK de Google Cloud.
Iniciar o reanudar una transmisión desde un número de cambio de sistema específico en el archivo de registro de rehacer (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
No se puede iniciar ni reanudar una emisión desde una posición específica con gcloud
. Para obtener información sobre cómo usar gcloud
para iniciar una emisión, consulta la documentación del SDK de Google Cloud.
Modificar un flujo
El siguiente código muestra una solicitud para actualizar la configuración de rotación de archivos de una emisión para que el archivo rote cada 75 MB o 45 segundos.
En este ejemplo, los campos especificados para el parámetro updateMask
incluyen los campos fileRotationMb
y fileRotationInterval
, representados por las marcas destinationConfig.gcsDestinationConfig.fileRotationMb
y destinationConfig.gcsDestinationConfig.fileRotationInterval
, respectivamente.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
En el siguiente código se muestra una solicitud para incluir un archivo de esquema de tipos unificados en la ruta de los archivos que Datastream escribe en Cloud Storage. Por lo tanto, Datastream escribe dos archivos: un archivo de datos JSON y un archivo de esquema Avro.
En este ejemplo, el campo especificado es jsonFileFormat
, representado por la marca destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
En el siguiente código se muestra una solicitud para que Datastream replique los datos disponibles, además de los cambios que se produzcan en los datos, desde la base de datos de origen hasta el destino.
En la sección oracleExcludedObjects
del código se muestran las tablas y los esquemas que no se pueden rellenar en el destino.
En este ejemplo, se rellenarán todas las tablas y los esquemas, excepto la tabla A del esquema 3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para modificar tu flujo, consulta la documentación del SDK de Google Cloud.
Iniciar el backfill de un objeto de un flujo
Un flujo de Datastream puede rellenar el historial de datos y transmitir los cambios en curso a un destino. Los cambios en curso siempre se transmitirán desde un origen a un destino. Sin embargo, puede especificar si quiere que se transmitan datos históricos.
Si quieres que los datos históricos se transfieran de la fuente al destino, usa el parámetro backfillAll
.
Datastream también te permite transmitir el historial de datos solo de tablas de bases de datos específicas. Para ello, usa el parámetro backfillAll
y excluye las tablas de las que no quieras obtener datos históricos.
Si solo quieres que se transmitan los cambios continuos al destino, usa el parámetro backfillNone
. Si quieres que DataStream transmita una captura de todos los datos disponibles del origen al destino, debes iniciar la reposición manualmente para los objetos que contengan estos datos.
Otro motivo para iniciar el relleno retroactivo de un objeto es que los datos no estén sincronizados entre el origen y el destino. Por ejemplo, un usuario puede eliminar datos de forma accidental en el destino y, por lo tanto, se pierden. En este caso, iniciar el relleno retroactivo del objeto sirve como "mecanismo de restablecimiento", ya que todos los datos se transmiten a la ubicación de destino de una sola vez. Como resultado, los datos se sincronizan entre el origen y el destino.
Para poder iniciar el relleno de un objeto de una secuencia, primero debes obtener información sobre el objeto.
Cada objeto tiene un OBJECT_ID, que lo identifica de forma única. Usa OBJECT_ID para iniciar el relleno de la emisión.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Para obtener más información sobre cómo usar gcloud
para iniciar el relleno retroactivo de un objeto de tu flujo, consulta la documentación del SDK de Google Cloud.
Detener el backfill de un objeto de un flujo
Después de iniciar el backfill de un objeto de un flujo, puedes detenerlo. Por ejemplo, si un usuario modifica un esquema de base de datos, el esquema o los datos pueden dañarse. No quieres que este esquema o estos datos se transmitan al destino, por lo que detienes el relleno retroactivo del objeto.
También puedes detener el relleno de un objeto para equilibrar la carga. Datastream puede ejecutar varios retornos de datos en paralelo. Esto puede que añada más carga al origen. Si la carga es significativa, detén el relleno para cada objeto y, a continuación, inicia el relleno para los objetos uno a uno.
Para poder detener el relleno de un objeto de un flujo, debes enviar una solicitud para obtener información sobre todos los objetos de un flujo. Cada objeto devuelto tiene un OBJECT_ID, que identifica de forma única el objeto. Usa OBJECT_ID para detener el relleno de la emisión.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Para obtener más información sobre cómo usar gcloud
para detener el relleno retroactivo de un objeto de tu flujo, consulta la documentación del SDK de Google Cloud.
Cambiar el número máximo de tareas de CDC simultáneas
En el siguiente código se muestra cómo definir en 7 el número máximo de tareas simultáneas de captura de datos de cambios (CDC) de un flujo de MySQL.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo maxConcurrentCdcTasks
. Si le asignas el valor 7, cambiarás el número máximo de tareas de CDC simultáneas del valor anterior a 7. Puedes usar valores del 0 al 50 (ambos incluidos). Si no defines el valor o lo defines como 0, se asignará el valor predeterminado del sistema, que es de 5 tareas, a la emisión.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Para obtener más información sobre el uso de gcloud
, consulta la documentación del SDK de Google Cloud.
Cambiar el número máximo de tareas de relleno simultáneas
En el siguiente código se muestra cómo definir el número máximo de tareas de relleno simultáneas de un flujo de MySQL en 25.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo maxConcurrentBackfillTasks
. Si le asigna el valor 25, cambiará el número máximo de tareas de relleno simultáneas del valor anterior a 25. Puedes usar valores del 0 al 50 (ambos incluidos). Si no define el valor o lo define como 0, se asignará al flujo el valor predeterminado del sistema, que es de 16 tareas.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Para obtener más información sobre el uso de gcloud
, consulta la documentación del SDK de Google Cloud.
Habilitar la transmisión de objetos grandes para fuentes de Oracle
Puede habilitar el streaming de objetos grandes, como objetos binarios grandes (BLOB
), objetos grandes de caracteres (CLOB
) y objetos grandes de caracteres nacionales (NCLOB
) en las secuencias con fuentes de Oracle. La marca streamLargeObjects
te permite incluir objetos grandes tanto en flujos nuevos como en los que ya tengas. La marca se define a nivel de flujo, por lo que no es necesario especificar las columnas de los tipos de datos de objetos grandes.
En el siguiente ejemplo se muestra cómo crear un flujo que te permita transmitir objetos grandes.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para actualizar una emisión, consulta la documentación del SDK de Google Cloud.
Eliminar un flujo
En el siguiente código se muestra una solicitud para eliminar una emisión.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Para obtener más información sobre cómo usar gcloud
para eliminar tu flujo, consulta la documentación del SDK de Google Cloud.
Siguientes pasos
- Consulta cómo usar la API Datastream para gestionar perfiles de conexión.
- Consulta cómo usar la API de Datastream para gestionar configuraciones de conectividad privada.
- Para obtener más información sobre cómo usar la API Datastream, consulta la documentación de referencia.