En esta página, aprenderás a usar la API de Datastream para lo siguiente:
- Crea transmisiones
- Obtén información sobre las transmisiones y los objetos de transmisión
- Inicia, pausa, reanuda y modifica las transmisiones, así como inicia y detén el reabastecimiento de los objetos de transmisión para actualizarlas
- Cómo recuperar transmisiones que fallaron de forma permanente
- Habilita la transmisión de objetos grandes para las transmisiones de Oracle
- Borra transmisiones
Existen dos formas de usar la API de Datastream. Puedes realizar llamadas a la API de REST o usar Google Cloud CLI (CLI).
Para obtener información de alto nivel sobre el uso de gcloud
para administrar flujos de Datastream, consulta flujos de Datastream de gcloud.
Crea una transmisión
En esta sección, aprenderás a crear un flujo que se usa para transferir datos de tu fuente a un destino. Los siguientes ejemplos no son exhaustivos, sino que destacan funciones específicas de Datastream. Para abordar tu caso de uso específico, usa estos ejemplos junto con la documentación de referencia de la API de Datastream.
En esta sección, se abarcan los siguientes casos de uso:
- Transmite desde Oracle a Cloud Storage
- Cómo transmitir de MySQL a BigQuery
- Cómo transmitir de PostgreSQL a BigQuery
- Define un conjunto de objetos para incluir en la transmisión
- Reabastece todos los objetos incluidos en la transmisión
- Cómo excluir objetos del flujo
- Cómo excluir objetos del reabastecimiento
- Define CMEK para encriptar datos en reposo
- Cómo definir el modo de escritura de una transmisión
Ejemplo 1: Transmite objetos específicos a BigQuery
En este ejemplo, aprenderás a hacer lo siguiente:
- Transmite datos de MySQL a BigQuery
- Cómo incluir un conjunto de objetos en la transmisión
- Define el modo de escritura de la transmisión como solo de inserción
- Reabastece todos los objetos incluidos en la transmisión
La siguiente es una solicitud para extraer todas las tablas de schema1
y dos tablas específicas de schema2
: tableA
y tableC
. Los eventos se escriben en un conjunto de datos
en BigQuery.
La solicitud no incluye el parámetro customerManagedEncryptionKey
, por lo que se usa el Google Cloud sistema de administración de claves interno para encriptar tus datos en lugar de CMEK.
El parámetro backfillAll
asociado con la ejecución del reabastecimiento histórico (o instantánea) se establece en un diccionario vacío ({}
), lo que significa que DataStream reabastece los datos históricos de todas las tablas incluidas en el flujo.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 2: Excluye objetos específicos de una transmisión con una fuente de PostgreSQL
En este ejemplo, aprenderás a hacer lo siguiente:
- Transmite de PostgreSQL a BigQuery
- Cómo excluir objetos del flujo
- Cómo excluir objetos del reabastecimiento
En el siguiente código, se muestra una solicitud para crear un flujo que se usa para transferir datos de una base de datos de PostgreSQL de origen a BigQuery. Cuando creas una transmisión a partir de una base de datos de PostgreSQL de origen, debes especificar dos campos adicionales específicos de PostgreSQL en tu solicitud:
replicationSlot
: Un espacio de replicación es un requisito previo para configurar una base de datos de PostgreSQL para la replicación. Debes crear un horario de replicación para cada flujo.publication
: Una publicación es un grupo de tablas de las que deseas replicar los cambios. El nombre de la publicación debe existir en la base de datos antes de iniciar un flujo. Como mínimo, la publicación debe incluir las tablas especificadas en la listaincludeObjects
del flujo.
El parámetro backfillAll
asociado con la ejecución del reabastecimiento histórico (o instantánea) se establece para excluir una tabla.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 3: Especifica el modo de escritura de solo anexar para una transmisión
Cuando transmites datos a BigQuery, puedes definir el modo de escritura: merge
o appendOnly
. Para obtener más información, consulta Cómo configurar el modo de escritura.
Si no especificas el modo de escritura en tu solicitud para crear un flujo, se usa el modo merge
predeterminada.
En la siguiente solicitud, se muestra cómo definir el modo appendOnly
cuando creas una transmisión de MySQL a BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 4: Cómo transmitir a un destino de Cloud Storage
En este ejemplo, aprenderás a hacer lo siguiente:
- Cómo transmitir desde Oracle a Cloud Storage
- Define un conjunto de objetos para incluir en la transmisión
- Define CMEK para encriptar datos en reposo
En la siguiente solicitud, se muestra cómo crear un flujo que escribe los eventos en un bucket de Cloud Storage.
En esta solicitud de ejemplo, los eventos se escriben en el formato de salida JSON y se crea un archivo nuevo cada 100 MB o 30 segundos (se anulan los valores predeterminados de 50 MB y 60 segundos).
En el formato JSON, puedes hacer lo siguiente:
Incluye un archivo de esquema de tipos unificados en la ruta de acceso. Como resultado, Datastream escribe dos archivos en Cloud Storage: un archivo de datos JSON y un archivo de esquema Avro. El archivo de esquema tiene el mismo nombre que el archivo de datos, con la extensión
.schema
.Habilita la compresión gzip para que Datastream comprima los archivos que se escriben en Cloud Storage.
Cuando se usa el parámetro backfillNone
, la solicitud especifica que solo los cambios en curso se transmiten al destino, sin reabastecimiento.
La solicitud especifica el parámetro de clave de encriptación administrada por el cliente, que te permite controlar las claves que se usan para encriptar datos en reposo dentro de un proyecto Google Cloud . El parámetro hace referencia a la CMEK que Datastream usa para encriptar los datos que se transmiten desde la fuente al destino. También especifica el llavero de claves de tu CMEK.
Para obtener más información sobre los llaveros de claves, consulta Recursos de Cloud KMS. Para obtener más información sobre cómo proteger tus datos con claves de encriptación, consulta Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Valida la definición de una transmisión
Antes de crear una transmisión, puedes validar su definición. De esta manera, puedes asegurarte de que se aprueben todas las verificaciones de validación y de que la transmisión se ejecute correctamente cuando se cree.
La validación de una transmisión verifica lo siguiente:
- Si la fuente está configurada correctamente para permitir que Datastream transmita datos desde ella.
- Si la transmisión puede conectarse tanto al origen como al destino.
- Es la configuración de extremo a extremo de la transmisión.
Para validar una transmisión, agrega &validate_only=true
a la URL que precede al cuerpo de la solicitud:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Después de realizar esta solicitud, verás las verificaciones de validación que ejecuta Datastream para tu fuente y tu destino, junto con si las verificaciones se aprueban o no. En el caso de las verificaciones de validación que no se aprueben, aparecerá información sobre por qué fallaron y qué hacer para rectificar el problema.
Por ejemplo, supongamos que tienes una clave de encriptación administrada por el cliente (CMEK) que deseas que Datastream use para encriptar los datos que se transmiten de la fuente al destino. Como parte de la validación de la transmisión, Datastream verificará que la clave exista y que Datastream tenga permisos para usarla. Si no se cumple alguna de estas condiciones, cuando valides la transmisión, se mostrará el siguiente mensaje de error:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Para resolver este problema, verifica que la clave que proporcionaste exista y que la cuenta de servicio de Datastream tenga el permiso cloudkms.cryptoKeys.get
para la clave.
Después de hacer las correcciones apropiadas, vuelve a realizar la solicitud para asegurarte de que todas las verificaciones de validación se aprueben. En el ejemplo anterior, la verificación de CMEK_VALIDATE_PERMISSIONS
ya no mostrará un mensaje de error, pero tendrá un estado de PASSED
.
Obtén información sobre una transmisión
En el siguiente código, se muestra una solicitud para recuperar información sobre una transmisión. Esta información incluye lo siguiente:
- El nombre del flujo (identificador único)
- Un nombre fácil de usar para la transmisión (nombre visible)
- Marcas de tiempo de la creación y la última actualización del flujo
- Información sobre los perfiles de conexión de origen y destino asociados con la transmisión
- El estado de la transmisión
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
La respuesta aparece de la siguiente manera:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Si deseas obtener más información para usar gcloud
y recuperar información sobre tu transmisión, haz clic aquí.
Enumera transmisiones
En el siguiente código, se muestra una solicitud para recuperar una lista de todas las transmisiones en el proyecto y la ubicación especificados.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Si deseas obtener más información para usar gcloud
y recuperar información sobre todas tus transmisiones, haz clic aquí.
Enumera objetos de una transmisión
En el siguiente código, se muestra una solicitud para recuperar información sobre todos los objetos de un flujo.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Si deseas obtener más información para usar gcloud
y recuperar información sobre todos los objetos de tu transmisión, haz clic aquí.
La lista de objetos que se muestra puede ser similar a la siguiente:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Para obtener más información sobre el uso de gcloud
para enumerar objetos de un flujo, haz clic aquí.
Iniciar una transmisión
En el siguiente código, se muestra una solicitud para iniciar una transmisión.
Usar el parámetro updateMask
en la solicitud hace que solo los campos que especifiques deban incluirse en el cuerpo de la solicitud. Para iniciar una transmisión, cambia el valor del campo state
de CREATED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Para obtener más información sobre el uso de gcloud
para iniciar la transmisión, haz clic aquí.
Pausar una transmisión
En el siguiente código, se muestra una solicitud para pausar una transmisión en ejecución.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo state
. Cuando pausas la transmisión, cambias su estado de RUNNING
a PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Para obtener más información sobre el uso de gcloud
para pausar la transmisión, haz clic aquí.
Reanudar una transmisión
En el siguiente código, se muestra una solicitud para reanudar una transmisión pausada.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo state
. Cuando reanudas la transmisión, cambias su estado de PAUSED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Para obtener más información sobre el uso de gcloud
para reanudar la transmisión, haz clic aquí.
Cómo recuperar una transmisión
Puedes recuperar una transmisión con errores permanentes con el método RunStream
. Cada tipo de base de datos de origen tiene su propia definición de qué operaciones de recuperación de flujos son posibles. Para obtener más información, consulta Cómo recuperar una transmisión.
Cómo recuperar una transmisión para una fuente de MySQL o Oracle
En las siguientes muestras de código, se muestran solicitudes para recuperar una transmisión de una fuente de MySQL o Oracle desde varias posiciones de archivos de registro:
REST
Recupera una transmisión desde la posición actual. Esta es la opción predeterminada:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Recupera una transmisión desde la siguiente posición disponible:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Cómo recuperar una transmisión desde la posición más reciente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Recupera una transmisión desde una posición específica (MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Reemplaza lo siguiente:
- NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro desde el que deseas recuperar tu transmisión.
- POSITION: Es la posición en el archivo de registro desde la que deseas recuperar tu transmisión. Si no proporcionas el valor, Datastream recupera la transmisión desde el principio del archivo.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Recupera una transmisión desde una posición específica (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Para obtener más información sobre las opciones de recuperación disponibles, consulta Cómo recuperar una transmisión.
gcloud
No se admite la recuperación de una transmisión con gcloud
.
Cómo recuperar una transmisión para una fuente de PostgreSQL
En la siguiente muestra de código, se muestra una solicitud para recuperar una transmisión de una fuente de PostgreSQL. Durante la recuperación, la transmisión comienza a leer desde el primer número de secuencia de registro (LSN) en la ranura de replicación configurada para la transmisión.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Si quieres cambiar el espacio de replicación, primero actualiza el flujo con el nuevo nombre de la ranura de replicación:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
No se admite la recuperación de una transmisión con gcloud
.
Cómo recuperar una transmisión para una fuente de SQL Server
En las siguientes muestras de código, se muestran solicitudes de ejemplo para recuperar una transmisión de una fuente de SQL Server.
REST
Recupera una transmisión desde la primera posición disponible:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Recupera una transmisión desde un número de secuencia de registro preferido:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
No se admite la recuperación de una transmisión con gcloud
.
Cómo iniciar o reanudar una transmisión desde una posición específica
Puedes iniciar una transmisión o reanudar una transmisión detenida desde una posición específica para las fuentes de MySQL y Oracle. Esto puede ser útil cuando deseas reabastecer con una herramienta externa o iniciar el CDC desde una posición que indiques. Para una fuente de MySQL, debes indicar una posición de registro binario. Para una fuente de Oracle, debes indicar un número de cambio del sistema (SCN) en el archivo de registro de rehacer.
En el siguiente código, se muestra una solicitud para iniciar o reanudar una transmisión ya creada desde una posición específica.
Inicia o reanuda una transmisión desde una posición de binlog específica (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Reemplaza lo siguiente:
- NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro desde el que deseas iniciar la transmisión.
- POSITION: Es la posición en el archivo de registro desde la que deseas iniciar la transmisión. Si no proporcionas el valor, Datastream comenzará a leer desde el principio del archivo.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
No se admite iniciar o reanudar una transmisión desde una posición específica con gcloud
. Para obtener información sobre cómo usar gcloud
para iniciar o reanudar una transmisión, consulta la documentación del SDK de Cloud.
Iniciar o reanudar una transmisión desde un número de cambio del sistema específico en el archivo de registro de redo (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
No se admite iniciar o reanudar una transmisión desde una posición específica con gcloud
. Para obtener información sobre cómo usar gcloud
para iniciar una transmisión, consulta la documentación del SDK de Cloud.
Modificar una transmisión
En el siguiente código, se muestra una solicitud para actualizar la configuración de rotación de archivos de una transmisión para que el archivo se rote cada 75 MB o 45 segundos.
En este ejemplo, los campos especificados para el parámetro updateMask
incluyen los campos fileRotationMb
y fileRotationInterval
, representados por las marcas destinationConfig.gcsDestinationConfig.fileRotationMb
y destinationConfig.gcsDestinationConfig.fileRotationInterval
, respectivamente.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
En el siguiente código, se muestra una solicitud para incluir un archivo de esquema de tipos unificados en la ruta de acceso de los archivos que Datastream escribe en Cloud Storage. Como resultado, Datastream escribe dos archivos: un archivo de datos JSON y un archivo de esquema Avro.
En este ejemplo, el campo especificado es el campo jsonFileFormat
, representado por la marca destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
En el siguiente código, se muestra una solicitud para que Datastream replique los datos existentes, además de los cambios continuos en los datos, de la base de datos de origen al destino.
En la sección oracleExcludedObjects
del código, se muestran las tablas y esquemas que están restringidas y no se pueden reabastecer en el destino.
Para este ejemplo, se reabastecerán todas las tablas y los esquemas, excepto tableA en schema3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para modificar tu transmisión, haz clic aquí.
Inicia el reabastecimiento de un objeto de una transmisión
Una transmisión en Datastream puede reabastecer datos históricos, así como transmitir cambios continuos a un destino. Los cambios en curso siempre se transmitirán de una fuente a un destino. Sin embargo, puedes especificar si deseas que se transmitan datos históricos.
Si deseas que los datos históricos se transmitan desde la fuente al destino, usa el parámetro backfillAll
.
Datastream también te permite transmitir datos históricos solo para tablas de bases de datos específicas. Para ello, usa el parámetro backfillAll
y excluye las tablas para las que no deseas datos históricos.
Si deseas que solo se transmitan los cambios en curso al destino, usa el parámetro backfillNone
. Si deseas que Datastream transmita una instantánea de todos los datos existentes de la fuente al destino, debes iniciar el reabastecimiento de forma manual para los objetos que contienen estos datos.
Otro motivo para iniciar el reabastecimiento de un objeto es si los datos no están sincronizados entre la fuente y el destino. Por ejemplo, un usuario puede borrar datos del destino por error, y estos se pierden. En este caso, iniciar el reabastecimiento del objeto funciona como un "mecanismo de restablecimiento", ya que todos los datos se transmiten al destino de una sola vez. Como resultado, los datos se sincronizan entre la fuente y el destino.
Antes de iniciar el reabastecimiento de un objeto de un flujo, debes recuperar información sobre el objeto.
Cada objeto tiene un OBJECT_ID, que lo identifica de forma inequívoca. Usas OBJECT_ID para iniciar el reabastecimiento de la transmisión.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Para obtener más información sobre el uso de gcloud
para iniciar el reabastecimiento de un objeto de tu flujo, consulta la documentación del SDK de Google Cloud.
Detén el reabastecimiento de un objeto de una transmisión
Después de iniciar el reabastecimiento de un objeto de una transmisión, puedes detenerlo. Por ejemplo, si un usuario modifica un esquema de base de datos, es posible que el esquema o los datos se dañen. No quieres que este esquema o estos datos se transmitan al destino, por lo que detienes el reabastecimiento del objeto.
También puedes detener el reabastecimiento de un objeto para el balanceo de cargas. Datastream puede ejecutar varios reabastecimientos en paralelo. Es posible que se agregue carga adicional a la fuente. Si la carga es significativa, detén el reabastecimiento de cada objeto y, luego, inícialo uno por uno.
Para poder detener el reabastecimiento de un objeto de una transmisión, debes enviar una solicitud para recuperar información sobre todos los objetos de una transmisión. Cada objeto que se muestra tiene un OBJECT_ID, que lo identifica de forma inequívoca. Usa OBJECT_ID para detener el reabastecimiento de la transmisión.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Para obtener más información sobre el uso de gcloud
para detener el reabastecimiento de un objeto de tu transmisión, haz clic aquí.
Cambia la cantidad máxima de tareas de CDC simultáneas
En el siguiente código, se muestra cómo establecer en 7 la cantidad máxima de tareas de captura de datos modificados (CDC) simultáneas para un flujo de MySQL.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo maxConcurrentCdcTasks
. Si estableces su valor en 7, cambias la cantidad máxima de tareas de CDC simultáneas del valor anterior a 7. Puedes usar valores de 0 a 50 (inclusive). Si no defines el valor o lo defines como 0, se establece el valor predeterminado del sistema de 5 tareas para la transmisión.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Para obtener más información sobre el uso de gcloud
, haz clic aquí.
Cambia la cantidad máxima de tareas de reabastecimiento simultáneas
En el siguiente código, se muestra cómo establecer la cantidad máxima de tareas de reabastecimiento simultáneas para un flujo de MySQL en 25.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo maxConcurrentBackfillTasks
. Si estableces su valor en 25, cambias la cantidad máxima de tareas de reabastecimiento simultáneas del valor anterior a 25. Puedes usar valores de 0 a 50 (inclusive). Si no defines el valor o lo defines como 0, se establecerá el valor predeterminado del sistema de 16 tareas para la transmisión.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Para obtener más información sobre el uso de gcloud
, haz clic aquí.
Habilita la transmisión de objetos grandes para fuentes de Oracle
Puedes habilitar la transmisión de objetos grandes, como objetos grandes binarios (BLOB
),
objetos grandes de caracteres (CLOB
) y objetos grandes de caracteres nacionales (NCLOB
)
para transmisiones con fuentes de Oracle. La marca streamLargeObjects
te permite incluir objetos grandes en transmisiones nuevas y existentes. La marca se establece a nivel del flujo, así que no necesitas especificar las columnas de los tipos de datos de objetos grandes.
En el siguiente ejemplo, se muestra cómo crear una transmisión que te permita transmitir objetos grandes.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para actualizar una transmisión, consulta la documentación del SDK de Google Cloud.
Borrar una transmisión
En el siguiente código, se muestra una solicitud para borrar una transmisión.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Para obtener más información sobre el uso de gcloud
para borrar tu transmisión, haz clic aquí.
¿Qué sigue?
- Obtén información para usar la API de Datastream para administrar perfiles de conexión.
- Obtén información para usar la API de Datastream y administrar configuraciones de conectividad privada.
- Para obtener más información sobre el uso de la API de Datastream, consulta la documentación de referencia.