Administrar transmisiones

Descripción general

En esta sección, aprenderás a usar la API de Datastream para hacer lo siguiente:

  • Crea transmisiones
  • Obtén información sobre transmisiones y objetos de transmisión
  • Actualiza las transmisiones iniciando, deteniendo, reanudando y modificando, así como iniciando y deteniendo el reabastecimiento de los objetos de transmisión.
  • Recupera las transmisiones con errores permanentes
  • Habilitar la transmisión de objetos grandes para las transmisiones de Oracle
  • Borra transmisiones

Existen dos formas de usar la API de Datastream. Puedes realizar llamadas a la API de REST o usar Google Cloud CLI (CLI).

Si quieres obtener información general sobre el uso de gcloud para administrar transmisiones de Datastream, consulta transmisiones de Datastream de gcloud.

Crea una transmisión

En esta sección, aprenderás a crear una transmisión que se use para transferir datos de tu origen a un destino. Los siguientes ejemplos no son exhaustivos, sino que destacan funciones específicas de Datastream. Para abordar tu caso de uso específico, usa estos ejemplos junto con la documentación de referencia de la API de Datastream.

En esta sección, se abarcan los siguientes casos de uso:

Ejemplo 1: Transmite objetos específicos a BigQuery

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de MySQL a BigQuery
  • Cómo incluir un conjunto de objetos en la transmisión
  • Define el modo de escritura para la transmisión como "solo anexar"
  • Reabastece todos los objetos incluidos en la transmisión

La siguiente es una solicitud para extraer todas las tablas de schema1 y dos tablas específicas de schema2: tableA y tableC. Los eventos se escriben en un conjunto de datos en BigQuery.

La solicitud no incluye el parámetro customerManagedEncryptionKey; por lo tanto, se usa el sistema de administración de claves interno de Google Cloud para encriptar tus datos en lugar de CMEK.

El parámetro backfillAll asociado con la realización del reabastecimiento histórico (o de la instantánea) se establece en un diccionario vacío ({}), lo que significa que Datastream reabastece los datos históricos de todas las tablas incluidas en el flujo.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Ejemplo 2: Excluye objetos específicos de una transmisión con una fuente de PostgreSQL

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de PostgreSQL a BigQuery
  • Excluir objetos de la transmisión
  • Excluir objetos del reabastecimiento

El siguiente código muestra una solicitud para crear una transmisión que se usa para transferir datos desde una base de datos de origen de PostgreSQL a BigQuery. Cuando creas una transmisión desde una base de datos de PostgreSQL de origen, debes especificar dos campos adicionales específicos de PostgreSQL en tu solicitud:

  • replicationSlot: Una ranura de replicación es un requisito previo a fin de configurar una base de datos de PostgreSQL para la replicación. Debes crear una ranura de replicación para cada transmisión.
  • publication: Una publicación es un grupo de tablas desde las que deseas replicar los cambios. El nombre de la publicación debe existir en la base de datos antes de iniciar una transmisión. Como mínimo, la publicación debe incluir las tablas especificadas en la lista includeObjects de la transmisión.

El parámetro backfillAll asociado con el reabastecimiento histórico (o instantánea) se configuró para excluir una tabla.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Si quieres obtener más información sobre el uso de gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Ejemplo 3: Especifica el modo de escritura de solo anexo para una transmisión

Cuando transmites a BigQuery, puedes definir el modo de escritura: merge o appendOnly. Para obtener más información, consulta Configura el modo de escritura.

Si no especificas el modo de escritura en tu solicitud para crear una transmisión, se usa el modo merge predeterminado.

En la siguiente solicitud, se muestra cómo definir el modo appendOnly cuando creas una transmisión de MySQL a BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Ejemplo 4: Transmite a un destino de Cloud Storage

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de Oracle a Cloud Storage
  • Definir un conjunto de objetos para incluir en la transmisión
  • Define CMEK para encriptar datos en reposo

La siguiente solicitud muestra cómo crear una transmisión que escriba los eventos en un bucket de Cloud Storage.

En esta solicitud de ejemplo, los eventos se escriben en formato de salida JSON y se crea un archivo nuevo cada 100 MB o 30 segundos (y se anulan los valores predeterminados de 50 MB y 60 segundos).

Para el formato JSON, puedes hacer lo siguiente:

  • Incluye un archivo de esquema de tipos unificados en la ruta. Como resultado, Datastream escribe dos archivos en Cloud Storage: un archivo de datos JSON y un archivo de esquema Avro. El archivo de esquema tiene el mismo nombre que el archivo de datos, con una extensión .schema.

  • Habilita la compresión gzip para que Datastream comprima los archivos que se escriben en Cloud Storage.

Cuando se usa el parámetro backfillNone, la solicitud especifica que solo se transmitirán los cambios en curso al destino, sin reabastecimiento.

La solicitud especifica el parámetro de clave de encriptación administrada por el cliente, que te permite controlar las claves que se usan para encriptar los datos en reposo dentro de un proyecto de Google Cloud. El parámetro hace referencia a la CMEK que usa Datastream para encriptar los datos que se transmiten desde el origen hasta el destino. También especifica el llavero de claves para tu CMEK.

Para obtener más información sobre los llaveros de claves, consulta los recursos de Cloud KMS. Para obtener más información sobre cómo proteger tus datos con claves de encriptación, consulta Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Valida la definición de una transmisión

Antes de crear una transmisión, puedes validar su definición. De esta manera, puedes asegurarte de que se aprueben todas las verificaciones de validación y de que la transmisión se ejecute correctamente cuando se cree.

La validación de una transmisión verifica lo siguiente:

  • Indica si la fuente está configurada correctamente para permitir que Datastream transmita datos desde ella.
  • Si la transmisión puede conectarse tanto al origen como al destino.
  • Es la configuración de extremo a extremo de la transmisión.

Para validar una transmisión, agrega &validate_only=true a la URL que antecede al cuerpo de tu solicitud:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Después de realizar esta solicitud, verás las verificaciones de validación que Datastream ejecuta para tu origen y destino, junto con las verificaciones que se aprobaron o no. En el caso de cualquier verificación de validación que no se apruebe, se muestra información sobre el motivo del error y qué hacer para rectificar el problema.

Por ejemplo, supongamos que tienes una clave de encriptación administrada por el cliente (CMEK) que quieres que Datastream use para encriptar los datos que se transmiten desde el origen hasta el destino. Como parte de la validación de la transmisión, Datastream verificará que la clave exista y que tenga permisos para usarla. Si no se cumple alguna de estas condiciones, cuando valides la transmisión, se devolverá el siguiente mensaje de error:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Para resolver este problema, verifica que la clave que proporcionaste exista y que la cuenta de servicio de Datastream tenga el permiso cloudkms.cryptoKeys.get para la clave.

Después de hacer las correcciones apropiadas, vuelve a realizar la solicitud para asegurarte de que todas las verificaciones de validación se aprueben. En el ejemplo anterior, la verificación de CMEK_VALIDATE_PERMISSIONS ya no mostrará un mensaje de error, pero tendrá un estado de PASSED.

Obtén información sobre una transmisión

El siguiente código muestra una solicitud para recuperar información sobre una transmisión. Esta información incluye:

  • El nombre de la transmisión (identificador único)
  • Un nombre fácil de usar para la transmisión (nombre visible)
  • Las marcas de tiempo de cuándo se creó y actualizó la transmisión
  • Información sobre los perfiles de conexión de origen y destino asociados con la transmisión
  • El estado de la transmisión

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

Aparece la respuesta, como se muestra a continuación:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para recuperar información sobre tu transmisión, haz clic aquí.

Enumera transmisiones

El siguiente código muestra una solicitud para recuperar una lista de todas las transmisiones en el proyecto y la ubicación especificados.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Si quieres obtener más información sobre el uso de gcloud para recuperar información sobre todas tus transmisiones, haz clic aquí.

Enumera objetos de una transmisión

El siguiente código muestra una solicitud para recuperar información sobre todos los objetos de una transmisión.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Si quieres obtener más información sobre el uso de gcloud para recuperar información sobre todos los objetos de tu transmisión, haz clic aquí.

La lista de objetos que se muestra puede ser similar a la siguiente:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para enumerar los objetos de una transmisión, haz clic aquí.

Iniciar una transmisión

El siguiente código muestra una solicitud para iniciar una transmisión.

Cuando usas el parámetro updateMask en la solicitud, solo se deben incluir los campos que especifiques en el cuerpo de la solicitud. Para iniciar una transmisión, cambia el valor en el campo state de CREATED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para iniciar tu transmisión, haz clic aquí.

Pausar una transmisión

El siguiente código muestra una solicitud para pausar una transmisión en ejecución.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo state. Si pausas la transmisión, cambiarás su estado de RUNNING a PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para pausar la transmisión, haz clic aquí.

Reanudar una transmisión

El siguiente código muestra una solicitud para reanudar una transmisión en pausa.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo state. Si reanudas la transmisión, cambiarás su estado de PAUSED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para reanudar la transmisión, haz clic aquí.

Cómo recuperar una transmisión

Puedes recuperar una transmisión con errores de forma permanente para una fuente de MySQL, Oracle o PostgreSQL con el método RunStream. Cada tipo de base de datos de origen tiene su propia definición de las operaciones de recuperación de transmisión. Para obtener más información, consulta Cómo recuperar una transmisión.

Recupera una transmisión para una fuente de MySQL o de Oracle

En las siguientes muestras de código, se muestran solicitudes para recuperar una transmisión para una fuente de MySQL o de Oracle desde varias posiciones de archivos de registro:

REST

Recupera una transmisión desde la posición actual. Esta es la opción predeterminada:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Recupera una transmisión desde la siguiente posición disponible:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Cómo recuperar una transmisión desde la posición más reciente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Recupera una transmisión de una posición específica (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Reemplaza lo siguiente:

  • NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro desde el que deseas recuperar la transmisión.
  • POSITION: Es la posición en el archivo de registro desde la que deseas recuperar la transmisión. Si no proporcionas el valor, Datastream recupera la transmisión desde el encabezado del archivo.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Recupera una transmisión de una posición específica (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Reemplaza scn por el número de cambio del sistema (SCN) en el archivo de registro de rehacer desde el que deseas recuperar la transmisión. Este campo es obligatorio.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Para obtener más información sobre las opciones de recuperación disponibles, consulta Recupera una transmisión.

gcloud

No se puede recuperar una transmisión usando gcloud.

Recupera una transmisión de una fuente de PostgreSQL

En la siguiente muestra de código, se presenta una solicitud para recuperar una transmisión de una fuente de PostgreSQL. Durante la recuperación, la transmisión comienza a leer desde el primer número de secuencia de registro (LSN) en la ranura de replicación configurada para la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Si quieres cambiar la ranura de replicación, primero actualiza la transmisión con el nombre de la ranura de replicación nuevo:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

No se puede recuperar una transmisión usando gcloud.

Inicia o reanuda una transmisión desde una posición específica

Puedes iniciar una transmisión o reanudar una pausada desde una posición específica para fuentes de MySQL y Oracle. Esto puede ser útil cuando deseas realizar el reabastecimiento con una herramienta externa o iniciar la CDC desde una posición que indiques. En el caso de una fuente de MySQL, debes indicar una posición binlog, para una fuente de Oracle, un número de cambio del sistema (SCN) en el archivo de registro de rehacer.

El siguiente código muestra una solicitud para iniciar o reanudar una transmisión ya creada desde una posición específica.

Inicia o reanuda una transmisión desde una posición específica de binlog (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Reemplaza lo siguiente:

  • NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro desde el que deseas iniciar la transmisión.
  • POSITION: Es la posición en el archivo de registro desde el que deseas iniciar la transmisión. Si no proporcionas el valor, Datastream comenzará a leer desde el encabezado del archivo.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

No se admite el inicio ni la reanudación de una transmisión desde una posición específica mediante gcloud. Si quieres obtener información sobre el uso de gcloud para iniciar o reanudar una transmisión, consulta la documentación del SDK de Cloud.

Inicia o reanuda una transmisión desde un número de cambio del sistema específico en el archivo de registro de rehacer (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Reemplaza scn por el número de cambio del sistema (SCN) en el archivo de registro de rehacer desde el que deseas iniciar la transmisión. Este campo es obligatorio.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

No se admite el inicio ni la reanudación de una transmisión desde una posición específica mediante gcloud. Si quieres obtener información sobre el uso de gcloud para iniciar una transmisión, consulta la documentación del SDK de Cloud.

Modificar una transmisión

El siguiente código muestra una solicitud para actualizar la configuración de rotación de archivos de una transmisión para rotar el archivo cada 75 MB o 45 segundos.

En este ejemplo, los campos especificados para el parámetro updateMask incluyen los campos fileRotationMb y fileRotationInterval, representados por las marcas destinationConfig.gcsDestinationConfig.fileRotationMb y destinationConfig.gcsDestinationConfig.fileRotationInterval, respectivamente.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Con el siguiente código, se muestra una solicitud para incluir un archivo de esquema de tipos unificados en la ruta de acceso de los archivos que Datastream escribe en Cloud Storage. Como resultado, Datastream escribe dos archivos: un archivo de datos JSON y un archivo de esquema Avro.

En este ejemplo, el campo especificado es el campo jsonFileFormat, representado por la marca destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

En el siguiente código, se muestra una solicitud para que Datastream replique los datos existentes, además de los cambios en curso, desde la base de datos de origen hasta el destino.

En la sección oracleExcludedObjects del código, se muestran las tablas y esquemas que están restringidas y no se pueden reabastecer en el destino.

Para este ejemplo, se reabastecerán todas las tablas y los esquemas, excepto tableA en schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Si quieres obtener más información sobre el uso de gcloud para modificar tu transmisión, haz clic aquí.

Iniciar el reabastecimiento de un objeto de una transmisión

Una transmisión en Datastream puede reabastecer datos históricos, así como transmitir cambios en curso a un destino. Los cambios en curso siempre se transmitirán desde una fuente a un destino. Sin embargo, puedes especificar si deseas que se transmitan los datos históricos.

Si deseas que los datos históricos se transmitan de la fuente al destino, usa el parámetro backfillAll.

Datastream también te permite transmitir datos históricos solo para tablas de base de datos específicas. Para ello, usa el parámetro backfillAll y excluye las tablas para las que no deseas obtener datos históricos.

Si deseas que solo se transmitan al destino los cambios en curso, usa el parámetro backfillNone. Si luego quieres que Datastream transmita una instantánea de todos los datos existentes de la fuente al destino, debes iniciar el reabastecimiento de forma manual para los objetos que contengan estos datos.

Otra razón para iniciar el reabastecimiento de un objeto es si los datos no están sincronizados entre el origen y el destino. Por ejemplo, un usuario puede borrar datos en el destino de manera involuntaria y estos se pierden. En este caso, iniciar el reabastecimiento para el objeto sirve como un “mecanismo de restablecimiento” porque todos los datos se transmiten al destino en una sola toma. Como resultado, los datos se sincronizan entre el origen y el destino.

Antes de iniciar el reabastecimiento de un objeto de una transmisión, debes recuperar información sobre el objeto.

Cada objeto tiene un OBJECT_ID, que lo identifica de manera única. Usa OBJECT_ID para iniciar el reabastecimiento de la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Si quieres obtener más información sobre el uso de gcloud para iniciar el reabastecimiento de un objeto de la transmisión, consulta la documentación del SDK de Google Cloud.

Detén el reabastecimiento de un objeto de una transmisión

Después de iniciar el reabastecimiento de un objeto de una transmisión, puedes detener el reabastecimiento del objeto. Por ejemplo, si un usuario modifica un esquema de base de datos, el esquema o los datos podrían dañarse. No quieres que este esquema o estos datos se transmitan al destino, por lo que detienes el reabastecimiento del objeto.

También puedes detener el reabastecimiento de un objeto con fines de balanceo de cargas. Datastream puede ejecutar varios reabastecimientos en paralelo. Esto puede agregar una carga adicional a la fuente. Si la carga es significativa, detén el reabastecimiento de cada objeto y, luego, inicia el reabastecimiento de los objetos, uno por uno.

Antes de detener el reabastecimiento de un objeto de una transmisión, debes realizar una solicitud para recuperar información sobre todos los objetos de una transmisión. Cada objeto que se muestra tiene un OBJECT_ID, que identifica al objeto de forma única. Usa el OBJECT_ID para detener el reabastecimiento de la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Si quieres obtener más información sobre el uso de gcloud para detener el reabastecimiento de un objeto de tu transmisión, haz clic aquí.

Cambia la cantidad máxima de tareas de CDC simultáneas

En el siguiente código, se muestra cómo establecer la cantidad máxima de tareas de captura de datos modificados simultáneos (CDC) para una transmisión de MySQL en 7.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo maxConcurrentCdcTasks. Cuando estableces su valor en 7, se cambia la cantidad máxima de tareas de CDC simultáneas del valor anterior a 7. Puedes usar valores de 0 a 50 (inclusive). Si no define el valor, o si lo define como 0, se establece el valor predeterminado del sistema de 5 tareas para el flujo.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Para obtener más información sobre el uso de gcloud, haz clic aquí.

Cambiar la cantidad máxima de tareas de reabastecimiento simultáneas

En el siguiente código, se muestra cómo establecer la cantidad máxima de tareas de reabastecimiento simultáneas para una transmisión de MySQL en 25.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo maxConcurrentBackfillTasks. Si estableces su valor en 25, cambiarás la cantidad máxima de tareas de reabastecimiento simultáneas del valor anterior a 25. Puedes usar valores de 0 a 50 (inclusive). Si no define el valor, o si lo define como 0, se establece el valor predeterminado del sistema de 16 tareas para el flujo.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Para obtener más información sobre el uso de gcloud, haz clic aquí.

Habilitar la transmisión de objetos grandes para fuentes de Oracle

Puedes habilitar la transmisión de objetos grandes, como objetos binarios grandes (BLOB), objetos grandes de caracteres (CLOB) y objetos grandes de caracteres nacionales (NCLOB) para transmisiones con fuentes de Oracle. La marca streamLargeObjects te permite incluir objetos grandes en transmisiones nuevas y existentes. La marca se establece a nivel de transmisión, no es necesario especificar las columnas de los tipos de datos de objetos grandes.

En el siguiente ejemplo, se muestra cómo crear una transmisión que te permite transmitir objetos grandes.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para actualizar una transmisión, consulta la documentación del SDK de Google Cloud.

Borrar una transmisión

En el siguiente código, se muestra una solicitud para borrar una transmisión.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Si quieres obtener más información para usar gcloud para borrar tu transmisión, haz clic aquí.