En esta página, aprenderás a usar la API de Datastream para realizar las siguientes acciones:
- Crea transmisiones
- Obtén información sobre transmisiones y objetos de transmisiones
- Actualiza las transmisiones iniciándolas, pausándolas, reanudándolas y modificándolas, así como iniciando y deteniendo el reabastecimiento de los objetos de transmisión.
- Recupera transmisiones con errores permanentes
- Habilita la transmisión de objetos grandes para las transmisiones de Oracle
- Borra transmisiones
Existen dos formas de usar la API de Datastream. Puedes realizar llamadas a la API de REST o usar Google Cloud CLI.
Para obtener información general sobre el uso de Google Cloud CLI
para administrar transmisiones de Datastream, consulta Transmisiones de Datastream de la CLI de gcloud.
Crea una transmisión
En esta sección, aprenderás a crear un flujo que se usa para transferir datos de tu fuente a un destino. Los siguientes ejemplos no son exhaustivos, sino que destacan funciones específicas de Datastream. Para abordar tu caso de uso específico, usa estos ejemplos junto con la documentación de referencia de la API de Datastream.
En esta sección, se abarcan los siguientes casos de uso:
- Transmite datos de Oracle a Cloud Storage
- Transmite de MySQL a BigQuery
- Transmite datos de PostgreSQL a BigQuery
- Define un conjunto de objetos para incluir en la transmisión
- Reabastece todos los objetos incluidos en la transmisión
- Cómo excluir objetos de la transmisión
- Excluye objetos del reabastecimiento
- Define la CMEK para encriptar datos en reposo
- Cómo definir el modo de escritura para una transmisión
- Transmite a un proyecto diferente en BigQuery
- Transmite datos a tablas administradas de BigLake
Ejemplo 1: Transmite objetos específicos a BigQuery
En este ejemplo, aprenderás a hacer lo siguiente:
- Transmite datos de MySQL a BigQuery
- Incluye un conjunto de objetos en la transmisión
- Define el modo de escritura de la transmisión como de solo agregar
- Reabastece todos los objetos incluidos en la transmisión
A continuación, se muestra una solicitud para extraer todas las tablas de schema1
y dos tablas específicas de schema2
: tableA
y tableC
. Los eventos se escriben en un conjunto de datos en BigQuery.
La solicitud no incluye el parámetro customerManagedEncryptionKey
, por lo que se usa el sistema interno de administración de claves de Google Cloud para encriptar tus datos en lugar de CMEK.
El parámetro backfillAll
asociado con la realización del reabastecimiento histórico (o instantánea) se establece en un diccionario vacío ({}
), lo que significa que Datastream reabastece los datos históricos de todas las tablas incluidas en el flujo.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 2: Excluye objetos específicos de una transmisión con una fuente de PostgreSQL
En este ejemplo, aprenderás a hacer lo siguiente:
- Transmite datos de PostgreSQL a BigQuery
- Cómo excluir objetos de la transmisión
- Excluye objetos del reabastecimiento
En el siguiente código, se muestra una solicitud para crear una transmisión que se usa para transferir datos de una base de datos de PostgreSQL de origen a BigQuery. Cuando creas una transmisión a partir de una base de datos de PostgreSQL de origen, debes especificar dos campos adicionales específicos de PostgreSQL en tu solicitud:
replicationSlot
: Una ranura de replicación es un requisito previo para configurar una base de datos de PostgreSQL para la replicación. Debes crear una ranura de replicación para cada transmisión.publication
: Una publicación es un grupo de tablas de las que deseas replicar los cambios. El nombre de la publicación debe existir en la base de datos antes de iniciar un flujo. Como mínimo, la publicación debe incluir las tablas especificadas en la listaincludeObjects
de la transmisión.
El parámetro backfillAll
asociado con la realización del reabastecimiento histórico (o instantánea) se configura para excluir una tabla.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 3: Especifica el modo de escritura de solo anexar para una transmisión
Cuando transmites datos a BigQuery, puedes definir el modo de escritura: merge
o appendOnly
. Para obtener más información, consulta Cómo configurar el modo de escritura.
Si no especificas el modo de escritura en tu solicitud para crear una transmisión, se usa el modo merge
predeterminado.
En la siguiente solicitud, se muestra cómo definir el modo appendOnly
cuando creas una transmisión de MySQL a BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 4: Transmite a un proyecto diferente en BigQuery
Si creaste tus recursos de Datastream en un proyecto, pero deseas transmitir datos a otro proyecto en BigQuery, puedes hacerlo con una solicitud similar a la siguiente.
Si especificas sourceHierarchyDatasets
para tu conjunto de datos de destino, deberás completar el campo projectId
.
Si especificas singleTargetDataset
para tu conjunto de datos de destino, completa el campo datasetId
en el formato projectId:datasetId
.
REST
Para sourceHierarchyDatasets
:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream1 { "displayName": "My cross-project stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" }, "projectId": "myProjectId2" } } }, "backfillAll": {} }
Para singleTargetDataset
:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream2 { "displayName": "My cross-project stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "singleTargetDataset": { "datasetId": "myProjectId2:myDatasetId" }, } }, "backfillAll": {} }
gcloud
Para sourceHierarchyDatasets
:
datastream streams create crossProjectBqStream1 --location=us-central1 --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json --destination=destination-cp --bigquery-destination-config=source_hierarchy_cross_project_config.json --backfill-none
El contenido del archivo de configuración source_hierarchy_cross_project_config.json
es el siguiente:
{"sourceHierarchyDatasets": {"datasetTemplate": {"location": "us-central1", "datasetIdPrefix": "prefix_"}, "projectId": "myProjectId2"}}
Para singleTargetDataset
:
datastream streams create crossProjectBqStream --location=us-central1 --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json --destination=destination-cp --bigquery-destination-config=single_target_cross_project_config.json --backfill-none
El contenido del archivo de configuración single_target_cross_project_config.json
es el siguiente:
{"singleTargetDataset": {"datasetId": "myProjectId2:myDatastetId"}}
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 5: Transmite a un destino de Cloud Storage
En este ejemplo, aprenderás a hacer lo siguiente:
- Transmite datos de Oracle a Cloud Storage
- Define un conjunto de objetos para incluir en la transmisión
- Define la CMEK para encriptar datos en reposo
En la siguiente solicitud, se muestra cómo crear una transmisión que escriba los eventos en un bucket de Cloud Storage.
En este ejemplo de solicitud, los eventos se escriben en el formato de salida JSON y se crea un archivo nuevo cada 100 MB o 30 segundos (se anulan los valores predeterminados de 50 MB y 60 segundos).
En el formato JSON, puedes hacer lo siguiente:
Incluye un archivo de esquema de tipos unificados en la ruta de acceso. Como resultado, Datastream escribe dos archivos en Cloud Storage: un archivo de datos JSON y un archivo de esquema Avro. El archivo de esquema tiene el mismo nombre que el archivo de datos, con una extensión
.schema
.Habilita la compresión gzip para que Datastream comprima los archivos que se escriben en Cloud Storage.
Con el parámetro backfillNone
, la solicitud especifica que solo los cambios en curso se transmiten al destino, sin relleno.
La solicitud especifica el parámetro de clave de encriptación administrada por el cliente, que te permite controlar las claves que se usan para encriptar datos en reposo dentro de un proyecto de Google Cloud . El parámetro hace referencia a la CMEK que Datastream usa para encriptar los datos que se transmiten desde la fuente hasta el destino. También especifica el llavero de claves de tu CMEK.
Para obtener más información sobre los llaveros de claves, consulta Recursos de Cloud KMS. Para obtener más información sobre cómo proteger tus datos con claves de encriptación, consulta Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para crear una transmisión, consulta la documentación del SDK de Google Cloud.
Ejemplo 6: Transmite datos a una tabla administrada de BigLake
En este ejemplo, aprenderás a configurar una transmisión para replicar datos de una base de datos de MySQL a una tabla de Iceberg de BigLake en modo append-only
.
Antes de crear la solicitud, asegúrate de haber completado los siguientes pasos:
- Tener un bucket de Cloud Storage en el que quieras almacenar tus datos
- Crea una conexión de recursos de Cloud
- Otorga acceso a la conexión de recursos de Cloud al bucket de Cloud Storage
Luego, puedes usar la siguiente solicitud para crear tu transmisión:
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream { "displayName": "MySQL to BigLake stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "my-mysql-database" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" , "bigqueryDestinationConfig": { "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": { "datasetId": "my-project-id:my-bigquery-dataset-id" }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
datastream streams create mysqlBigLakeStream --location=us-central1 --display-name=mysql-to-bl-stream --source=source--mysql-source-config=mysql_source_config.json --destination=destination --bigquery-destination-config=bl_config.json --backfill-none
El contenido del archivo de configuración de origen mysql_source_config.json
es el siguiente:
{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{ "database":"my-mysql-database"}]}}
El contenido del archivo de configuración bl_config.json
es el siguiente:
{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder","connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }
Terraform
resource "google_datastream_stream" "stream" { stream_id = "mysqlBlStream" location = "us-central1" display_name = "MySQL to BigLake stream" source_config { source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp" mysql_source_config { include_objects { mysql_databases { database = "my-mysql-database" } } } } destination_config { destination_connection_profile ="projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" bigquery_destination_config { single_target_dataset { dataset_id = "my-project-id:my-bigquery-dataset-id" } blmt_config { bucket = "my-gcs-bucket-name" table_format = "ICEBERG" file_format = "PARQUET" connection_name = "my-project-id.us-central1.my-bigquery-connection-name" root_path = "my/folder" } append_only {} } } backfill_none {} }
Valida la definición de una transmisión
Antes de crear una transmisión, puedes validar su definición. De esta manera, puedes asegurarte de que todas las verificaciones de validación se aprueben y de que la transmisión se ejecute correctamente cuando se cree.
La validación de una transmisión verifica lo siguiente:
- Indica si la fuente está configurada correctamente para permitir que Datastream transmita datos desde ella.
- Si la transmisión puede conectarse tanto al origen como al destino.
- Es la configuración de extremo a extremo de la transmisión.
Para validar una transmisión, agrega &validate_only=true
a la URL que precede al cuerpo de la solicitud:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Después de que realices esta solicitud, verás las verificaciones de validación que Datastream ejecuta para tu fuente y tu destino, junto con la información sobre si las verificaciones se aprobaron o rechazaron. En el caso de cualquier verificación de validación que no se apruebe, aparecerá información sobre por qué falló y qué hacer para rectificar el problema.
Por ejemplo, supongamos que tienes una clave de encriptación administrada por el cliente (CMEK) que deseas que Datastream use para encriptar los datos que se transmiten desde la fuente al destino. Como parte de la validación de la transmisión, Datastream verificará que la clave exista y que Datastream tenga permisos para usarla. Si no se cumple alguna de estas condiciones, cuando valides la transmisión, se mostrará el siguiente mensaje de error:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Para resolver este problema, verifica que la clave que proporcionaste exista y que la cuenta de servicio de Datastream tenga el permiso cloudkms.cryptoKeys.get
para la clave.
Después de hacer las correcciones apropiadas, vuelve a realizar la solicitud para asegurarte de que todas las verificaciones de validación se aprueben. En el ejemplo anterior, la verificación de CMEK_VALIDATE_PERMISSIONS
ya no mostrará un mensaje de error, sino que tendrá el estado PASSED
.
Obtén información sobre una transmisión
En el siguiente código, se muestra una solicitud para recuperar información sobre una transmisión. Esta información incluye lo siguiente:
- El nombre de la transmisión (identificador único)
- Nombre fácil de usar para la transmisión (nombre visible)
- Marcas de tiempo de cuándo se creó y actualizó por última vez la transmisión
- Es la información sobre los perfiles de conexión de origen y destino asociados a la transmisión.
- Estado de la transmisión
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
La respuesta aparece de la siguiente manera:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Para obtener más información sobre cómo usar gcloud
para recuperar información sobre tu transmisión, consulta la documentación del SDK de Google Cloud.
Enumera transmisiones
En el siguiente código, se muestra una solicitud para recuperar una lista de todas las transmisiones en el proyecto y la ubicación especificados.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Para obtener más información sobre cómo usar gcloud
para recuperar información sobre todas tus transmisiones, consulta la documentación del SDK de Google Cloud.
Enumera objetos de una transmisión
En el siguiente código, se muestra una solicitud para recuperar información sobre todos los objetos de una transmisión.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Para obtener más información sobre cómo usar gcloud
para recuperar información sobre todos los objetos de tu transmisión, consulta la documentación del SDK de Google Cloud.
La lista de objetos que se muestra puede ser similar a la siguiente:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Para obtener más información sobre el uso de gcloud
para enumerar objetos de una transmisión, consulta la documentación del SDK de Google Cloud.
Iniciar una transmisión
En el siguiente código, se muestra una solicitud para iniciar una transmisión.
Usar el parámetro updateMask
en la solicitud hace que solo los campos que especifiques deban incluirse en el cuerpo de la solicitud. Para iniciar una transmisión, cambia el valor del campo state
de CREATED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Para obtener más información sobre cómo usar gcloud
para iniciar tu transmisión, consulta la documentación del SDK de Google Cloud.
Pausar una transmisión
En el siguiente código, se muestra una solicitud para pausar una transmisión en ejecución.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo state
. Cuando pausas la transmisión, cambias su estado de RUNNING
a PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Para obtener más información sobre cómo usar gcloud
para pausar tu transmisión, consulta la documentación del SDK de Google Cloud.
Reanudar una transmisión
En el siguiente código, se muestra una solicitud para reanudar una transmisión en pausa.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo state
. Cuando reanudas la transmisión, cambias su estado de PAUSED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Para obtener más información sobre cómo usar gcloud
para reanudar tu transmisión, consulta la documentación del SDK de Google Cloud.
Recupera una transmisión
Puedes recuperar una transmisión con errores permanentes con el método RunStream
. Cada tipo de base de datos de origen tiene su propia definición de las operaciones de recuperación de transmisiones posibles. Para obtener más información, consulta Cómo recuperar una transmisión.
Recupera una transmisión para una fuente de MySQL o de Oracle
En las siguientes muestras de código, se muestran solicitudes para recuperar una transmisión de una fuente de MySQL o de Oracle desde varias posiciones de archivos de registro:
REST
Recupera una transmisión desde la posición actual. Esta es la opción predeterminada:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Para recuperar una transmisión desde la siguiente posición disponible, haz lo siguiente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Para recuperar una transmisión desde la posición más reciente, haz lo siguiente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Recupera una transmisión desde una posición específica (replicación basada en el registro binario de MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Reemplaza lo siguiente:
- NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro del que deseas recuperar tu transmisión.
- POSITION: Es la posición en el archivo de registro desde la que deseas recuperar tu transmisión. Si no proporcionas el valor, Datastream recupera la transmisión desde el encabezado del archivo.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Recupera una transmisión desde una posición específica (replicación basada en GTID de MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
Reemplaza GTID_SET por uno o más GTID únicos o rangos de GTID desde los que deseas recuperar tu transmisión.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3" } } } }
Cómo recuperar una transmisión desde una posición específica (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Para obtener más información sobre las opciones de recuperación disponibles, consulta Cómo recuperar una transmisión.
gcloud
No se admite la recuperación de una transmisión con gcloud
.
Recupera una transmisión para una fuente de PostgreSQL
En la siguiente muestra de código, se muestra una solicitud para recuperar una transmisión de una fuente de PostgreSQL. Durante la recuperación, la transmisión comienza a leer desde el primer número de secuencia de registro (LSN) en la ranura de replicación configurada para la transmisión.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Si deseas cambiar la ranura de replicación, primero actualiza la transmisión con el nuevo nombre de la ranura de replicación:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
No se admite la recuperación de una transmisión con gcloud
.
Recupera una transmisión para una fuente de SQL Server
En las siguientes muestras de código, se muestran ejemplos de solicitudes para recuperar una transmisión de una fuente de SQL Server.
REST
Para recuperar una transmisión desde la primera posición disponible, haz lo siguiente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Para recuperar una transmisión desde un número de secuencia de registro preferido, haz lo siguiente:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
No se admite la recuperación de una transmisión con gcloud
.
Cómo iniciar o reanudar una transmisión desde una posición específica
Puedes iniciar o reanudar una transmisión detenida desde una posición específica para las fuentes de MySQL y Oracle. Esto puede ser útil cuando deseas realizar un reabastecimiento con una herramienta externa o iniciar la CDC desde una posición que indiques. En el caso de una fuente de MySQL, debes indicar una posición de binlog o un conjunto de GTID. En el caso de una fuente de Oracle, debes indicar un número de cambio del sistema (SCN) en el archivo de registro de rehacer.
En el siguiente código, se muestra una solicitud para iniciar o reanudar una transmisión ya creada desde una posición específica.
Cómo iniciar o reanudar una transmisión desde una posición específica del binlog (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Reemplaza lo siguiente:
- NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro desde el que deseas iniciar tu transmisión.
- POSITION: Es la posición en el archivo de registro desde la que deseas iniciar tu transmisión. Si no proporcionas el valor, Datastream comenzará a leer desde el encabezado del archivo.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
No se admite iniciar o reanudar una transmisión desde una posición específica con gcloud
. Para obtener información sobre cómo usar gcloud
para iniciar o reanudar una transmisión, consulta la documentación del SDK de Cloud.
Cómo iniciar o reanudar una transmisión desde un conjunto de GTID específico (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
Reemplaza GTID_SET por uno o más GTID únicos o rangos de GTID desde los que deseas iniciar o reanudar tu transmisión.
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7" } } } }
gcloud
No se admite iniciar o reanudar una transmisión desde una posición específica con gcloud
. Para obtener información sobre cómo usar gcloud
para iniciar o reanudar una transmisión, consulta la documentación del SDK de Cloud.
Inicia o reanuda una transmisión desde un número de cambio del sistema específico en el archivo de registro de rehacer (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Por ejemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
No se admite iniciar o reanudar una transmisión desde una posición específica con gcloud
. Para obtener información sobre cómo usar gcloud
para iniciar una transmisión, consulta la documentación del SDK de Cloud.
Modificar una transmisión
En el siguiente código, se muestra una solicitud para actualizar la configuración de rotación de archivos de una transmisión y rotar el archivo cada 75 MB o 45 s.
En este ejemplo, los campos especificados para el parámetro updateMask
incluyen los campos fileRotationMb
y fileRotationInterval
, representados por las marcas destinationConfig.gcsDestinationConfig.fileRotationMb
y destinationConfig.gcsDestinationConfig.fileRotationInterval
, respectivamente.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
En el siguiente código, se muestra una solicitud para incluir un archivo de esquema de tipos unificados en la ruta de acceso de los archivos que Datastream escribe en Cloud Storage. Como resultado, Datastream escribe dos archivos: un archivo de datos JSON y un archivo de esquema Avro.
En este ejemplo, el campo especificado es el campo jsonFileFormat
, representado por la marca destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
El siguiente código muestra una solicitud para que Datastream replique los datos existentes, además de los cambios continuos en los datos, de la base de datos de origen al destino.
En la sección oracleExcludedObjects
del código, se muestran las tablas y esquemas que están restringidas y no se pueden reabastecer en el destino.
Para este ejemplo, se reabastecerán todas las tablas y los esquemas, excepto tableA en schema3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para modificar tu transmisión, consulta la documentación del SDK de Google Cloud.
Inicia el reabastecimiento de un objeto de una transmisión
Una transmisión en Datastream puede reabastecer datos históricos y transmitir cambios continuos a un destino. Los cambios en curso siempre se transmitirán desde una fuente a un destino. Sin embargo, puedes especificar si deseas que se transmitan los datos históricos.
Si deseas que los datos históricos se transmitan desde la fuente al destino, usa el parámetro backfillAll
.
Datastream también te permite transmitir datos históricos solo para tablas de bases de datos específicas. Para ello, usa el parámetro backfillAll
y excluye las tablas para las que no quieras datos históricos.
Si solo deseas que se transmitan los cambios en curso al destino, usa el parámetro backfillNone
. Si luego deseas que Datastream transmita una instantánea de todos los datos existentes desde la fuente al destino, debes iniciar el reabastecimiento de forma manual para los objetos que contienen estos datos.
Otro motivo para iniciar el reabastecimiento de un objeto es si los datos no están sincronizados entre la fuente y el destino. Por ejemplo, un usuario puede borrar datos en el destino de forma inadvertida, y los datos se pierden. En este caso, iniciar el reabastecimiento del objeto funciona como un "mecanismo de restablecimiento", ya que todos los datos se transmiten al destino de una sola vez. Como resultado, los datos se sincronizan entre la fuente y el destino.
Antes de iniciar el relleno de un objeto de una transmisión, debes recuperar información sobre el objeto.
Cada objeto tiene un OBJECT_ID, que lo identifica de forma única. Usas OBJECT_ID para iniciar el reabastecimiento de la transmisión.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Para obtener más información sobre cómo usar gcloud
para iniciar el reabastecimiento de un objeto de tu transmisión, consulta la documentación del SDK de Google Cloud.
Detén el reabastecimiento de un objeto de una transmisión
Después de iniciar el reabastecimiento de un objeto de una transmisión, puedes detener el reabastecimiento del objeto. Por ejemplo, si un usuario modifica un esquema de base de datos, es posible que el esquema o los datos se dañen. No quieres que este esquema o estos datos se transmitan al destino, por lo que detienes el reabastecimiento del objeto.
También puedes detener el reabastecimiento de un objeto para balancear la carga. Datastream puede ejecutar varios reabastecimientos en paralelo. Es posible que se agregue carga adicional a la fuente. Si la carga es significativa, detén el reabastecimiento de cada objeto y, luego, inicia el reabastecimiento de los objetos uno por uno.
Antes de detener el reabastecimiento de un objeto de una transmisión, debes realizar una solicitud para recuperar información sobre todos los objetos de una transmisión. Cada objeto que se devuelve tiene un OBJECT_ID, que lo identifica de forma única. Usas OBJECT_ID para detener el reabastecimiento de la transmisión.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Para obtener más información sobre cómo usar gcloud
para detener el reabastecimiento de un objeto de tu transmisión, consulta la documentación del SDK de Google Cloud.
Cómo cambiar la cantidad máxima de tareas de CDC simultáneas
En el siguiente código, se muestra cómo establecer en 7 la cantidad máxima de tareas simultáneas de captura de datos modificados (CDC) para un flujo de MySQL.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo maxConcurrentCdcTasks
. Si estableces su valor en 7, cambiarás la cantidad máxima de tareas de CDC simultáneas del valor anterior a 7. Puedes usar valores del 0 al 50 (inclusive). Si no defines el valor o lo defines como 0, se establece el valor predeterminado del sistema de 5 tareas para la transmisión.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Para obtener más información sobre el uso de gcloud
, consulta la documentación del SDK de Google Cloud.
Cómo cambiar la cantidad máxima de tareas de reabastecimiento simultáneas
En el siguiente código, se muestra cómo establecer en 25 la cantidad máxima de tareas de relleno simultáneas para una transmisión de MySQL.
En este ejemplo, el campo especificado para el parámetro updateMask
es el campo maxConcurrentBackfillTasks
. Si estableces su valor en 25, cambiarás la cantidad máxima de tareas de relleno simultáneas del valor anterior a 25. Puedes usar valores del 0 al 50 (inclusive). Si no defines el valor o lo defines como 0, se establece el valor predeterminado del sistema de 16 tareas para la transmisión.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Para obtener más información sobre el uso de gcloud
, consulta la documentación del SDK de Google Cloud.
Habilita la transmisión de objetos grandes para las fuentes de Oracle
Puedes habilitar la transmisión de objetos grandes, como objetos binarios grandes (BLOB
), objetos grandes de caracteres (CLOB
) y objetos grandes de caracteres nacionales (NCLOB
) para transmisiones con fuentes de Oracle. La marca streamLargeObjects
te permite incluir objetos grandes en transmisiones nuevas y existentes. La marca se establece a nivel de la transmisión, por lo que no es necesario que especifiques las columnas de tipos de datos de objetos grandes.
En el siguiente ejemplo, se muestra cómo crear una transmisión que te permita transmitir objetos grandes.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Para obtener más información sobre cómo usar gcloud
para actualizar una transmisión, consulta la documentación del SDK de Google Cloud.
Borrar una transmisión
En el siguiente código, se muestra una solicitud para borrar una transmisión.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Para obtener más información sobre cómo usar gcloud
para borrar tu transmisión, consulta la documentación del SDK de Google Cloud.
¿Qué sigue?
- Aprende a usar la API de Datastream para administrar perfiles de conexión.
- Obtén información para usar la API de Datastream para administrar la configuración de conectividad privada.
- Para obtener más información sobre el uso de la API de Datastream, consulta la documentación de referencia.