Administrar transmisiones

Descripción general

En esta sección, aprenderás a usar la API de Datastream para hacer lo siguiente:

  • Crear transmisiones
  • Obtén información sobre transmisiones y objetos de transmisión
  • Actualiza transmisiones iniciando, pausando, reanudando y modificando transmisiones, además de iniciar y detener el reabastecimiento de los objetos de transmisión.
  • Cómo recuperar las transmisiones con errores permanentes
  • Habilita la transmisión de objetos grandes para las transmisiones de Oracle
  • Borra transmisiones

Existen dos formas de usar la API de Datastream. Puedes hacer llamadas a la API de REST o usar Google Cloud CLI (CLI).

Si deseas obtener información de alto nivel sobre el uso de gcloud para administrar transmisiones de Datastream, consulta Flujos de Datastream de gcloud.

Crea una transmisión

En esta sección, aprenderás a crear un flujo que se use para transferir datos de tu origen a un destino. Los siguientes ejemplos no son exhaustivos, sino que destacan funciones específicas de Datastream. Para abordar tu caso de uso específico, usa estos ejemplos junto con la documentación de referencia de la API de Datastream.

En esta sección, se abordan los siguientes casos de uso:

Ejemplo 1: Transmite objetos específicos a BigQuery

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de MySQL a BigQuery
  • Incluye un conjunto de objetos en la transmisión
  • Reabastece todos los objetos incluidos en la transmisión.

La siguiente es una solicitud para extraer todas las tablas de schema1 y dos tablas específicas de schema2: tableA y tableC. Los eventos se escriben en un conjunto de datos en BigQuery.

La solicitud no incluye el parámetro customerManagedEncryptionKey, por lo que se usa el sistema de administración de claves interno de Google Cloud para encriptar tus datos en lugar de CMEK.

El parámetro backfillAll asociado con realizar el reabastecimiento (o instantánea) histórico se establece en un diccionario vacío ({}), lo que significa que Datastream reabastece los datos históricos de todas las tablas incluidas en la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {}
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Ejemplo 2: Excluye objetos específicos de una transmisión con una fuente de PostgreSQL

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de PostgreSQL a BigQuery
  • Excluye objetos de la transmisión
  • Excluir objetos del reabastecimiento

El siguiente código muestra una solicitud para crear un flujo que se usa para transferir datos de una base de datos PostgreSQL de origen a BigQuery. Cuando creas una transmisión a partir de una base de datos PostgreSQL de origen, debes especificar dos campos adicionales específicos de PostgreSQL en la solicitud:

  • replicationSlot: Una ranura de replicación es un requisito previo para configurar una base de datos de PostgreSQL para la replicación. Debes crear una ranura de replicación para cada transmisión.
  • publication: Una publicación es un grupo de tablas desde el que quieres replicar cambios. El nombre de la publicación debe existir en la base de datos antes de iniciar una transmisión. Como mínimo, la publicación debe incluir las tablas especificadas en la lista includeObjects de la transmisión.

El parámetro backfillAll asociado con la realización del reabastecimiento histórico (o instantánea) está configurado para excluir una tabla.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Si quieres obtener más información sobre el uso de gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Ejemplo 3: Transmite a un destino de Cloud Storage

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de Oracle a Cloud Storage
  • Define un conjunto de objetos para incluir en la transmisión
  • Define CMEK para encriptar datos en reposo

En la siguiente solicitud, se muestra cómo crear una transmisión que escribe los eventos en un bucket de Cloud Storage.

En esta solicitud de ejemplo, los eventos se escriben en el formato de salida JSON y se crea un archivo nuevo cada 100 MB o 30 segundos (anulando los valores predeterminados de 50 MB y 60 segundos).

Para el formato JSON, puedes hacer lo siguiente:

  • Incluye un archivo de esquema de tipos unificados en la ruta. Como resultado, Datastream escribe dos archivos en Cloud Storage: uno de datos JSON y otro de esquema de Avro. El archivo de esquema tiene el mismo nombre que el archivo de datos, con una extensión .schema.

  • Habilita la compresión gzip para que Datastream comprima los archivos que se escriben en Cloud Storage.

Cuando se usa el parámetro backfillNone, la solicitud especifica que solo los cambios en curso se transmiten al destino, sin reabastecimiento.

La solicitud especifica el parámetro de la clave de encriptación administrada por el cliente que te permite controlar las claves que se usan para encriptar datos en reposo dentro de un proyecto de Google Cloud. El parámetro hace referencia a las CMEK que usa Datastream para encriptar los datos que se transmiten desde el origen al destino. También especifica el llavero de claves para tu CMEK.

Para obtener más información sobre los llaveros de claves, consulta los recursos de Cloud KMS. Para obtener más información sobre cómo proteger tus datos con claves de encriptación, consulta Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Valida la definición de una transmisión

Antes de crear una transmisión, puedes validar su definición. De esta manera, podrás asegurarte de que se aprueben todas las verificaciones de validación y de que la transmisión se ejecutará correctamente cuando se cree.

La validación de una transmisión verifica lo siguiente:

  • Indica si la fuente está configurada correctamente para permitir que Datastream transmita datos desde ella.
  • Si la transmisión puede conectarse tanto al origen como al destino.
  • La configuración de extremo a extremo de la transmisión.

Para validar una transmisión, agrega &validate_only=true a la URL que aparece antes del cuerpo de tu solicitud:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Después de realizar esta solicitud, verás las verificaciones de validación que Datastream ejecuta para tu origen y destino, junto con si las verificaciones se aprueban o fallan. Para cualquier verificación de validación que no se apruebe, aparece información sobre el motivo del error y qué hacer para rectificar el problema.

Por ejemplo, supongamos que tienes una clave de encriptación administrada por el cliente (CMEK) que quieres que Datastream use para encriptar los datos que se transmiten desde el origen hasta el destino. Como parte de la validación de la transmisión, Datastream verificará que exista la clave y que tenga permisos para usarla. Si no se cumple cualquiera de estas condiciones, cuando valides la transmisión, se mostrará el siguiente mensaje de error:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Para resolver este problema, verifica que la clave que proporcionaste exista y que la cuenta de servicio de Datastream tenga el permiso cloudkms.cryptoKeys.get para la clave.

Después de hacer las correcciones apropiadas, vuelve a realizar la solicitud para asegurarte de que todas las verificaciones de validación se aprueben. Para el ejemplo anterior, la verificación de CMEK_VALIDATE_PERMISSIONS ya no mostrará un mensaje de error, pero tendrá el estado PASSED.

Obtén información sobre una transmisión

El siguiente código muestra una solicitud para recuperar información sobre una transmisión. Esta información incluye:

  • El nombre de la transmisión (identificador único)
  • Un nombre fácil de usar para la transmisión (nombre visible)
  • Marcas de tiempo de la última actualización y creación de la transmisión
  • Información sobre los perfiles de conexión de origen y destino asociados con la transmisión
  • El estado de la transmisión

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

Aparecerá la respuesta de la siguiente manera:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para recuperar información sobre tu transmisión, haz clic aquí.

Enumera transmisiones

El siguiente código muestra una solicitud para recuperar una lista de todas las transmisiones en el proyecto y la ubicación especificados.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Si quieres obtener más información sobre el uso de gcloud para recuperar información sobre todas tus transmisiones, haz clic aquí.

Enumera objetos de una transmisión

El siguiente código muestra una solicitud para recuperar información sobre todos los objetos de una transmisión.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Si quieres obtener más información sobre el uso de gcloud para recuperar información sobre todos los objetos de tu transmisión, haz clic aquí.

La lista de objetos que se muestra puede ser similar a la siguiente:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Si deseas obtener más información sobre el uso de gcloud para enumerar los objetos de una transmisión, haz clic aquí.

Iniciar una transmisión

El siguiente código muestra una solicitud para iniciar una transmisión.

Cuando usas el parámetro updateMask en la solicitud, solo los campos que especifiques deben incluirse en el cuerpo de la solicitud. Para iniciar una transmisión, cambia el valor en el campo state de CREATED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para iniciar una transmisión, haz clic aquí.

Cómo iniciar una transmisión desde una posición específica

Puedes iniciar una transmisión desde una posición específica para las fuentes de MySQL y Oracle, por ejemplo, si deseas realizar un reabastecimiento mediante una herramienta externa, o iniciar una CDC desde un archivo de registro que indiques. En el caso de una fuente de MySQL, debes indicar una posición binlog; para una fuente de Oracle, un número de cambio del sistema (SCN) en el archivo de registro redo.

En el siguiente código, se muestra una solicitud para iniciar una transmisión ya creada desde una posición específica.

Inicia una transmisión desde una posición binlog específica (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Reemplaza lo siguiente:

  • NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro desde el que deseas iniciar la transmisión.
  • POSITION: Es la posición en el archivo de registro desde el que deseas iniciar la transmisión. Si no proporcionas el valor, Datastream comienza a leer desde el encabezado del archivo.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

No se puede iniciar una transmisión desde una posición específica con gcloud. Si deseas obtener información sobre el uso de gcloud para iniciar una transmisión, consulta la documentación del SDK de Cloud.

Inicia una transmisión desde un número de cambio del sistema específico en el archivo de registro para rehacer (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Reemplaza scn por el número de cambio del sistema (SCN) en el archivo de registro para rehacer desde el que deseas iniciar la transmisión. Este campo es obligatorio.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

No se puede iniciar una transmisión desde una posición específica con gcloud. Si deseas obtener información sobre el uso de gcloud para iniciar una transmisión, consulta la documentación del SDK de Cloud.

Pausar una transmisión

En el siguiente código, se muestra una solicitud para pausar una transmisión en ejecución.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo state. Si pausas la transmisión, cambiarás su estado de RUNNING a PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Si quieres obtener más información para pausar la transmisión en vivo con gcloud, haz clic aquí.

Reanudar una transmisión

En el siguiente código, se muestra una solicitud para reanudar una transmisión en pausa.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo state. Si reanudas la transmisión, cambiarás su estado de PAUSED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para reanudar la transmisión, haz clic aquí.

Cómo recuperar una transmisión

Puedes recuperar una transmisión con errores de forma permanente para una fuente de MySQL, Oracle o PostgreSQL mediante el método RunStream. Cada tipo de base de datos de origen tiene su propia definición de qué operaciones de recuperación de transmisión son posibles. Para obtener más información, consulta Recupera una transmisión.

Recupera una transmisión para una fuente de MySQL o de Oracle

En las siguientes muestras de código, se muestran las solicitudes para recuperar una transmisión de una fuente de MySQL o de Oracle desde varias posiciones de archivos de registro:

REST

Recupera una transmisión de la posición actual. Esta es la opción predeterminada:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Recupera una transmisión desde la siguiente posición disponible:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Recupera una transmisión desde la posición más reciente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Recupera una transmisión desde una posición específica (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Reemplaza lo siguiente:

  • NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro del que deseas recuperar la transmisión.
  • POSITION: Es la posición en el archivo de registro del que deseas recuperar la transmisión. Si no proporcionas el valor, Datastream recupera la transmisión desde el encabezado del archivo.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Recupera una transmisión desde una posición específica (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Reemplaza scn por el número de cambio del sistema (SCN) en el archivo de registro de rehacer del que deseas recuperar la transmisión. Este campo es obligatorio.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Para obtener más información sobre las opciones de recuperación disponibles, consulta Recupera una transmisión.

gcloud

No se admite la recuperación de transmisiones con gcloud.

Recupera una transmisión de una fuente de PostgreSQL

En la siguiente muestra de código, se muestra una solicitud para recuperar una transmisión de una fuente de PostgreSQL. Durante la recuperación, la transmisión comienza a leer desde el primer número de secuencia de registro (LSN) en la ranura de replicación configurada para la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Si quieres cambiar la ranura de replicación, primero actualiza la transmisión con el nombre de la ranura de replicación nueva:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

No se admite la recuperación de transmisiones con gcloud.

Modificar una transmisión

El siguiente código muestra una solicitud para actualizar la configuración de rotación de un archivo de transmisión para rotar el archivo cada 75 MB o 45 segundos.

En este ejemplo, los campos especificados para el parámetro updateMask incluyen los campos fileRotationMb y fileRotationInterval, representados por las marcas destinationConfig.gcsDestinationConfig.fileRotationMb y destinationConfig.gcsDestinationConfig.fileRotationInterval, respectivamente.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

En el siguiente código, se muestra una solicitud para incluir un archivo de esquema de tipos unificados en la ruta de acceso de los archivos que Datastream escribe en Cloud Storage. Como resultado, Datastream escribe dos archivos: uno de datos JSON y uno de esquema de Avro.

En este ejemplo, el campo especificado es el campo jsonFileFormat, representado por la marca destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

En el siguiente código, se muestra una solicitud para que Datastream replique datos existentes, además de los cambios continuos en los datos, desde la base de datos de origen hacia el destino.

En la sección oracleExcludedObjects del código, se muestran las tablas y esquemas que están restringidas y no se pueden reabastecer en el destino.

Para este ejemplo, se reabastecerán todas las tablas y los esquemas, excepto tableA en schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para modificar tu transmisión, haz clic aquí.

Iniciar el reabastecimiento de un objeto de una transmisión

Una transmisión en Datastream puede reabastecer datos históricos y transmitir cambios en curso a un destino. Los cambios en curso siempre se transmitirán de un origen a un destino. Sin embargo, puedes especificar si deseas que se transmitan los datos históricos.

Si deseas que los datos históricos se transmitan del origen al destino, usa el parámetro backfillAll.

Datastream también te permite transmitir datos históricos solo para tablas de bases de datos específicas. Para ello, usa el parámetro backfillAll y excluye las tablas de las que no deseas datos históricos.

Si deseas que solo los cambios en curso se transmitan al destino, usa el parámetro backfillNone. Si luego quieres que Datastream transmita una instantánea de todos los datos existentes desde la fuente al destino, debes iniciar el reabastecimiento de forma manual para los objetos que contienen estos datos.

Otra razón para iniciar el reabastecimiento de un objeto es que los datos no estén sincronizados entre la fuente y el destino. Por ejemplo, un usuario puede borrar datos en el destino de forma involuntaria, y los datos ahora se pierden. En este caso, iniciar el reabastecimiento para el objeto sirve como un “mecanismo de restablecimiento” porque todos los datos se transmiten al destino en una sola toma. Como resultado, los datos se sincronizan entre el origen y el destino.

Antes de iniciar el reabastecimiento de un objeto de una transmisión, debes recuperar información sobre el objeto.

Cada objeto tiene un OBJECT_ID, que lo identifica de manera única. Usa OBJECT_ID para iniciar el reabastecimiento de la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Si quieres obtener más información sobre el uso de gcloud para el reabastecimiento inicial de un objeto de tu transmisión, haz clic aquí.

Detén el reabastecimiento de un objeto de una transmisión

Después de iniciar el reabastecimiento para un objeto de una transmisión, puedes detener el reabastecimiento para el objeto. Por ejemplo, si un usuario modifica un esquema de base de datos, el esquema o los datos pueden dañarse. No quieres que este esquema o datos se transmitan al destino, por lo que debes detener el reabastecimiento del objeto.

También puedes detener el reabastecimiento de un objeto para fines de balanceo de cargas. Datastream puede ejecutar varios reabastecimientos en paralelo. Es posible que se agregue una carga adicional a la fuente. Si la carga es significativa, detén el reabastecimiento para cada objeto y, luego, inicia el reabastecimiento para los objetos, uno por uno.

Antes de detener el reabastecimiento de un objeto de una transmisión, debes realizar una solicitud para recuperar información sobre todos los objetos de una transmisión. Cada objeto que se muestra tiene un OBJECT_ID, que identifica al objeto de forma única. Usa OBJECT_ID para detener el reabastecimiento de la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Si quieres obtener más información sobre el uso de gcloud para detener el reabastecimiento de un objeto de tu transmisión, haz clic aquí.

Cambia la cantidad máxima de tareas de CDC simultáneas

En el siguiente código, se muestra cómo establecer en 7 la cantidad máxima de tareas simultáneas de captura de datos modificados (CDC) para una transmisión de MySQL.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo maxConcurrentCdcTasks. Si estableces su valor en 7, cambiarás la cantidad máxima de tareas de CDC simultáneas del valor anterior a 7. Puedes usar valores de 0 a 50 (inclusive). Si no defines el valor, o si lo defines como 0, se establecerá el valor predeterminado del sistema de 5 tareas para la transmisión.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }
}

gcloud

Para obtener más información sobre el uso de gcloud, haz clic aquí.

Cambiar la cantidad máxima de tareas de reabastecimiento simultáneas

El siguiente código muestra cómo establecer la cantidad máxima de tareas de reabastecimiento simultáneas para una transmisión de MySQL en 25.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo maxConcurrentBackfillTasks. Si estableces su valor en 25, cambias la cantidad máxima de tareas de reabastecimiento simultáneas del valor anterior a 25. Puedes usar valores de 0 a 50 (inclusive). Si no defines el valor, o si lo defines como 0, se establece el valor predeterminado del sistema de 16 tareas para la transmisión.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }
}

gcloud

Para obtener más información sobre el uso de gcloud, haz clic aquí.

Habilita la transmisión de objetos grandes para las fuentes de Oracle

Puedes habilitar la transmisión de objetos grandes, como objetos binarios grandes (BLOB), objetos grandes de caracteres (CLOB) y objetos grandes de carácter nacional (NCLOB) para transmisiones con fuentes de Oracle. La marca streamLargeObjects te permite incluir objetos grandes en transmisiones nuevas y existentes. La marca se configura a nivel de transmisión, no necesitas especificar las columnas de tipos de datos de objetos grandes.

En el siguiente ejemplo, se muestra cómo crear una transmisión que te permite transmitir objetos grandes.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Si deseas obtener más información sobre el uso de gcloud para actualizar una transmisión, consulta la documentación del SDK de Google Cloud.

Borrar una transmisión

En el siguiente código, se muestra una solicitud para borrar una transmisión.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Si quieres obtener más información sobre el uso de gcloud para borrar tu transmisión, haz clic aquí.