Administrar transmisiones

Descripción general

En esta sección, aprenderás a usar la API de Datastream para hacer lo siguiente:

  • Crea transmisiones
  • Obtén información sobre transmisiones y objetos de transmisión
  • Actualiza las transmisiones iniciando, deteniendo, reanudando y modificando, así como iniciando y deteniendo el reabastecimiento de los objetos de transmisión.
  • Cómo recuperar transmisiones que fallaron de forma permanente
  • Habilitar la transmisión de objetos grandes para las transmisiones de Oracle
  • Borra transmisiones

Existen dos formas de usar la API de Datastream. Puedes realizar llamadas a la API de REST o usar Google Cloud CLI (CLI).

Si quieres obtener información general sobre el uso de gcloud para administrar transmisiones de Datastream, consulta transmisiones de Datastream de gcloud.

Crea una transmisión

En esta sección, aprenderás a crear una transmisión que se use para transferir datos de tu origen a un destino. Los siguientes ejemplos no son exhaustivos, sino que destacan funciones específicas de Datastream. Para abordar tu caso de uso específico, usa estos ejemplos junto con la documentación de referencia de la API de Datastream.

En esta sección, se abarcan los siguientes casos de uso:

Ejemplo 1: Transmite objetos específicos a BigQuery

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de MySQL a BigQuery
  • Cómo incluir un conjunto de objetos en la transmisión
  • Define el modo de escritura para la transmisión como "solo anexar"
  • Reabastece todos los objetos incluidos en la transmisión

La siguiente es una solicitud para extraer todas las tablas de schema1 y dos tipos tablas de schema2: tableA y tableC. Los eventos se escriben en un conjunto de datos en BigQuery.

La solicitud no incluye el parámetro customerManagedEncryptionKey, por lo que se usa el sistema de administración de claves interno de Google Cloud para encriptar tus datos en lugar de CMEK.

El parámetro backfillAll asociado con el rendimiento del historial El reabastecimiento (o instantánea) está configurado en un diccionario vacío ({}), lo que significa que Datastream reabastece los datos históricos de todas las tablas incluidas en la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Para obtener más información sobre cómo usar gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Ejemplo 2: Excluye objetos específicos de una transmisión con una fuente de PostgreSQL

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de PostgreSQL a BigQuery
  • Excluir objetos de la transmisión
  • Excluir objetos del reabastecimiento

El siguiente código muestra una solicitud para crear una transmisión que se usa para transferir datos desde una base de datos de origen de PostgreSQL a BigQuery. Cuando creas una transmisión a partir de una base de datos de PostgreSQL de origen, debes especificar dos campos adicionales específicos de PostgreSQL en tu solicitud:

  • replicationSlot: Un espacio de replicación es un requisito previo para configurar una base de datos de PostgreSQL para la replicación. Debes crear una ranura de replicación para cada transmisión.
  • publication: Una publicación es un grupo de tablas desde las que deseas replicar los cambios. El nombre de la publicación debe existir en la base de datos antes de iniciar una transmisión. Como mínimo, la publicación debe incluir las tablas especificadas en la lista includeObjects de la transmisión.

El parámetro backfillAll asociado con el reabastecimiento histórico (o instantánea) se configuró para excluir una tabla.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Para obtener más información sobre cómo usar gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Ejemplo 3: Especifica el modo de escritura de solo anexar para una transmisión

Cuando transmites datos a BigQuery, puedes definir el modo de escritura: merge o appendOnly. Para obtener más información, consulta Cómo configurar el modo de escritura.

Si no especificas el modo de escritura en tu solicitud para crear un flujo, se usa el modo merge predeterminada.

En la siguiente solicitud, se muestra cómo definir el modo appendOnly cuando creas una transmisión de MySQL a BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Para obtener más información sobre cómo usar gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Ejemplo 4: Transmite a un destino de Cloud Storage

En este ejemplo, aprenderás a hacer lo siguiente:

  • Transmite de Oracle a Cloud Storage
  • Define un conjunto de objetos para incluir en la transmisión
  • Define CMEK para encriptar datos en reposo

En la siguiente solicitud, se muestra cómo crear una transmisión que escriba los eventos en un bucket de Cloud Storage.

En esta solicitud de ejemplo, los eventos se escriben en el formato de salida JSON y se crea un archivo nuevo cada 100 MB o 30 segundos (se anulan los valores predeterminados de 50 MB y 60 segundos).

En el formato JSON, puedes hacer lo siguiente:

  • Incluye un archivo de esquema de tipos unificados en la ruta de acceso. Como resultado, Datastream escribe dos archivos en Cloud Storage: un archivo de datos JSON y un archivo de esquema Avro. El archivo de esquema tiene el mismo nombre que el archivo de datos, con una extensión .schema.

  • Habilita la compresión gzip para que Datastream comprima los archivos que se escriben en Cloud Storage.

Cuando se usa el parámetro backfillNone, la solicitud especifica que solo los cambios en curso se transmiten al destino, sin reabastecimiento.

La solicitud especifica el parámetro de clave de encriptación administrada por el cliente, que te permite controlar las claves que se usan para encriptar los datos en reposo dentro de un proyecto de Google Cloud. El parámetro hace referencia a las CMEK que usa Datastream para encriptar los datos que se transmiten desde el origen hasta el destino. También especifica el llavero de claves de tu CMEK.

Para obtener más información sobre los llaveros de claves, consulta los recursos de Cloud KMS. Para obtener más información sobre cómo proteger tus datos con claves de encriptación, consulta Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Para obtener más información sobre cómo usar gcloud para crear una transmisión, consulta la documentación del SDK de Google Cloud.

Valida la definición de una transmisión

Antes de crear una transmisión, puedes validar su definición. De esta manera, puedes asegurarte de que se aprueben todas las verificaciones de validación y de que la transmisión se ejecute correctamente cuando se cree.

La validación de una transmisión verifica lo siguiente:

  • Indica si la fuente está configurada correctamente para permitir que Datastream transmita datos desde ella.
  • Si la transmisión puede conectarse tanto al origen como al destino.
  • La configuración de extremo a extremo de la transmisión.

Para validar una transmisión, agrega &validate_only=true a la URL que antecede al cuerpo de tu solicitud:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Después de realizar esta solicitud, verás las verificaciones de validación que Datastream ejecuta para tu origen y destino, junto con las verificaciones que se aprobaron o no. En el caso de cualquier verificación de validación que no se apruebe, se muestra información sobre el motivo del error y qué hacer para rectificar el problema.

Por ejemplo, supongamos que tienes una clave de encriptación administrada por el cliente (CMEK) que quieres que Datastream use para encriptar los datos que se transmiten desde el origen hasta el destino. Como parte de la validación de la transmisión, Datastream verificará que la clave exista y que Datastream tenga permisos para usarla. Si no se cumple alguna de estas condiciones, cuando valides la transmisión, se mostrará el siguiente mensaje de error:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Para resolver este problema, verifica que la clave que proporcionaste exista y que la cuenta de servicio de Datastream tenga el permiso cloudkms.cryptoKeys.get para la clave.

Después de hacer las correcciones apropiadas, vuelve a realizar la solicitud para asegurarte de que todas las verificaciones de validación se aprueben. En el ejemplo anterior, la verificación de CMEK_VALIDATE_PERMISSIONS ya no mostrará un mensaje de error, pero tendrá un estado de PASSED.

Cómo obtener información sobre una transmisión

En el siguiente código, se muestra una solicitud para recuperar información sobre una transmisión. Esta información incluye:

  • El nombre de la transmisión (identificador único)
  • Un nombre fácil de usar para la transmisión (nombre visible)
  • Marcas de tiempo de la creación y la última actualización del flujo
  • Información sobre los perfiles de conexión de origen y destino asociados con la transmisión
  • El estado de la transmisión

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

Aparece la respuesta, como se muestra a continuación:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para recuperar información sobre tu transmisión, haz clic aquí.

Enumera transmisiones

En el siguiente código, se muestra una solicitud para recuperar una lista de todas las transmisiones en el proyecto y la ubicación especificados.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Si deseas obtener más información para usar gcloud y recuperar información sobre todas tus transmisiones, haz clic aquí.

Enumera objetos de una transmisión

En el siguiente código, se muestra una solicitud para recuperar información sobre todos los objetos de un flujo.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Si deseas obtener más información para usar gcloud y recuperar información sobre todos los objetos de tu transmisión, haz clic aquí.

La lista de objetos que se muestra puede ser similar a la siguiente:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para enumerar los objetos de una transmisión, haz clic aquí.

Iniciar una transmisión

El siguiente código muestra una solicitud para iniciar una transmisión.

Cuando usas el parámetro updateMask en la solicitud, solo se deben incluir los campos que especifiques en el cuerpo de la solicitud. Para iniciar una transmisión, cambia el valor en el campo state de CREATED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Para obtener más información sobre el uso de gcloud para iniciar la transmisión, haz clic aquí.

Pausar una transmisión

En el siguiente código, se muestra una solicitud para pausar una transmisión en ejecución.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo state. Si pausas la transmisión, cambiarás su estado de RUNNING a PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para pausar la transmisión, haz clic aquí.

Reanudar una transmisión

En el siguiente código, se muestra una solicitud para reanudar una transmisión en pausa.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo state. Si reanudas la transmisión, cambiarás su estado de PAUSED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Si quieres obtener más información sobre el uso de gcloud para reanudar la transmisión, haz clic aquí.

Cómo recuperar una transmisión

Puedes recuperar una transmisión con errores permanentes con el método RunStream. Cada El tipo de base de datos de origen tiene su propia definición de las operaciones de recuperación de transmisión son posibles. Para obtener más información, consulta Cómo recuperar una transmisión.

Cómo recuperar una transmisión para una fuente de MySQL o Oracle

En las siguientes muestras de código, se muestran solicitudes para recuperar una transmisión de una fuente de MySQL o Oracle desde varias posiciones de archivos de registro:

REST

Recupera una transmisión desde la posición actual. Esta es la opción predeterminada:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Recupera una transmisión desde la siguiente posición disponible:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Cómo recuperar una transmisión desde la posición más reciente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Recupera una transmisión de una posición específica (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Reemplaza lo siguiente:

  • NAME_OF_THE_LOG_FILE: El nombre del archivo de registro del que deseas para recuperar tu transmisión
  • POSITION: Es la posición en el archivo de registro desde la que deseas realizar la acción. recuperar tu transmisión. Si no proporcionas el valor, Datastream se recupera la transmisión desde el encabezado del archivo.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Recupera una transmisión de una posición específica (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Reemplaza scn con el número de cambio del sistema (SCN) en el archivo de registro de rehacer del que deseas recuperar tu transmisión. Este campo es obligatorio.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Para obtener más información sobre las opciones de recuperación disponibles, consulta Cómo recuperar una transmisión.

gcloud

No se admite la recuperación de una transmisión con gcloud.

Cómo recuperar una transmisión para una fuente de PostgreSQL

En la siguiente muestra de código, aparece una solicitud para recuperar una transmisión de PostgreSQL fuente. Durante la recuperación, la transmisión comienza a leer desde la primera secuencia de registro (LSN) en la ranura de replicación configurada para la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Si quieres cambiar la ranura de replicación, actualiza la transmisión con el nuevo nombre de la ranura de replicación primero:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

No se admite la recuperación de una transmisión con gcloud.

Recupera una transmisión para una fuente de SQL Server

En las siguientes muestras de código, se presentan solicitudes de ejemplo para recuperar una transmisión de un SQL Fuente del servidor.

REST

Recupera una transmisión desde la primera posición disponible:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Recupera una transmisión desde un número de secuencia de registro preferido:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

No se admite la recuperación de una transmisión con gcloud.

Inicia o reanuda una transmisión desde una posición específica

Puedes iniciar una transmisión o reanudar una pausada desde una posición específica por Fuentes de MySQL y Oracle. Esto puede ser útil cuando deseas reabastecer con una herramienta externa o iniciar el CDC desde una posición que indiques. Para un fuente de MySQL, debes indicar una posición binlog; para una fuente de Oracle, un el número de cambio del sistema (SCN) en el archivo de registro para rehacer.

El siguiente código muestra una solicitud para iniciar o reanudar una transmisión ya creada desde una posición específica.

Inicia o reanuda una transmisión desde una posición específica de binlog (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Reemplaza lo siguiente:

  • NAME_OF_THE_LOG_FILE: Es el nombre del archivo de registro desde el que deseas iniciar la transmisión.
  • POSITION: Es la posición en el archivo de registro desde el que deseas comenzar. tu flujo. Si no proporcionas el valor, Datastream comenzará a leer desde el principio del archivo.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

No se admite el inicio ni la reanudación de una transmisión desde una posición específica mediante gcloud. Si quieres obtener información sobre el uso de gcloud para iniciar o reanudar una transmisión, consulta la documentación del SDK de Cloud.

Iniciar o reanudar una transmisión desde un número de cambio del sistema específico en el archivo de registro de redo (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Reemplaza scn con el número de cambio del sistema (SCN) en el archivo de registro de rehacer desde la que desees comenzar tu transmisión. Este campo es obligatorio.

Por ejemplo:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

No se admite el inicio ni la reanudación de una transmisión desde una posición específica mediante gcloud. Para obtener información sobre cómo usar gcloud para iniciar una transmisión, consulta la documentación del SDK de Cloud.

Modificar una transmisión

En el siguiente código, se muestra una solicitud para actualizar la configuración de rotación de archivos de una transmisión para que el archivo se rote cada 75 MB o 45 segundos.

En este ejemplo, los campos especificados para el parámetro updateMask incluyen los campos fileRotationMb y fileRotationInterval, representados por las marcas destinationConfig.gcsDestinationConfig.fileRotationMb y destinationConfig.gcsDestinationConfig.fileRotationInterval, respectivamente.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

El siguiente código muestra una solicitud para incluir un archivo de esquema de tipos unificados en la ruta de acceso de los archivos que Datastream escribe en Cloud Storage. Como resultado, Datastream escribe dos archivos: un archivo de datos JSON y un archivo de esquema Avro.

En este ejemplo, el campo especificado es el campo jsonFileFormat, representado por la marca destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

En el siguiente código, se muestra una solicitud para que Datastream replique los datos existentes, además de los cambios continuos en los datos, de la base de datos de origen al destino.

En la sección oracleExcludedObjects del código, se muestran las tablas y esquemas que están restringidas y no se pueden reabastecer en el destino.

Para este ejemplo, se reabastecerán todas las tablas y los esquemas, excepto tableA en schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Para obtener más información sobre el uso de gcloud para modificar tu transmisión, haz clic aquí.

Inicia el reabastecimiento de un objeto de una transmisión

Una transmisión en Datastream puede reabastecer datos históricos, así como transmitir cambios continuos a un destino. Los cambios en curso siempre se transmitirán desde una fuente a un destino. Sin embargo, puedes especificar si deseas que se transmitan datos históricos.

Si deseas que los datos históricos se transmitan desde la fuente al destino, usa el parámetro backfillAll.

Datastream también te permite transmitir datos históricos solo para tablas de base de datos específicas. Para ello, usa el parámetro backfillAll y excluye las tablas para las que no deseas datos históricos.

Si deseas que solo se transmitan los cambios en curso al destino, usa el parámetro backfillNone. Si deseas que Datastream transmita una instantánea de todos los datos existentes de la fuente al destino, debes iniciar el reabastecimiento de forma manual para los objetos que contienen estos datos.

Otro motivo para iniciar el reabastecimiento de un objeto es si los datos no están sincronizados entre la fuente y el destino. Por ejemplo, un usuario puede borrar datos en el destino de manera involuntaria y estos se pierden. En este caso, iniciar el reabastecimiento del objeto funciona como un “mecanismo de restablecimiento” porque todos los datos se transmiten al destino de una sola vez. Como resultado, los datos se sincronizan entre el origen y el destino.

Antes de iniciar el reabastecimiento de un objeto de una transmisión, debes recuperar información sobre el objeto.

Cada objeto tiene un OBJECT_ID, que lo identifica de forma inequívoca. Usa OBJECT_ID para iniciar el reabastecimiento de la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Si quieres obtener más información sobre el uso de gcloud para iniciar el reabastecimiento de un objeto de la transmisión, consulta la documentación del SDK de Google Cloud.

Detén el reabastecimiento de un objeto de una transmisión

Después de iniciar el reabastecimiento de un objeto de una transmisión, puedes detener el reabastecimiento del objeto. Por ejemplo, si un usuario modifica un esquema de base de datos, el esquema o los datos podrían dañarse. No quieres que este esquema o estos datos se transmitan al destino, por lo que detienes el reabastecimiento del objeto.

También puedes detener el reabastecimiento de un objeto con fines de balanceo de cargas. Datastream puede ejecutar varios reabastecimientos en paralelo. Es posible que se agregue carga adicional a la fuente. Si la carga es significativa, detén el reabastecimiento de cada objeto y, luego, inícialo uno por uno.

Antes de detener el reabastecimiento de un objeto de una transmisión, debes realizar una solicitud para recuperar información sobre todos los objetos de una transmisión. Cada objeto que se muestra tiene un OBJECT_ID, que identifica al objeto de forma única. Usa el OBJECT_ID para detener el reabastecimiento de la transmisión.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Para obtener más información sobre el uso de gcloud para detener el reabastecimiento de un objeto de tu transmisión, haz clic aquí.

Cambia la cantidad máxima de tareas de CDC simultáneas

En el siguiente código, se muestra cómo establecer la cantidad máxima de tareas de captura de datos de cambios (CDC) simultáneas para un flujo de MySQL en 7.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo maxConcurrentCdcTasks. Si estableces su valor en 7, cambias la cantidad máxima de tareas de CDC simultáneas del valor anterior a 7. Puedes usar valores de 0 a 50 (inclusive). Si no define el valor, o si lo define como 0, se establece el valor predeterminado del sistema de 5 tareas para el flujo.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Para obtener más información sobre el uso de gcloud, haz clic aquí.

Cambiar la cantidad máxima de tareas de reabastecimiento simultáneas

En el siguiente código, se muestra cómo establecer la cantidad máxima de tareas de reabastecimiento simultáneas para una transmisión de MySQL en 25.

En este ejemplo, el campo especificado para el parámetro updateMask es el campo maxConcurrentBackfillTasks. Si estableces su valor en 25, cambias la cantidad máxima de tareas de reabastecimiento simultáneas del valor anterior a 25. Puedes usar valores de 0 a 50 (inclusive). Si no defines el valor o lo defines como 0, se establece el valor predeterminado del sistema de 16 tareas para la transmisión.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Para obtener más información sobre el uso de gcloud, haz clic aquí.

Habilitar la transmisión de objetos grandes para fuentes de Oracle

Puedes habilitar la transmisión de objetos grandes, como objetos binarios grandes (BLOB), objetos grandes de caracteres (CLOB) y objetos grandes de caracteres nacionales (NCLOB) para las transmisiones con fuentes de Oracle. La marca streamLargeObjects te permite incluir objetos grandes en transmisiones nuevas y existentes. La marca se establece a nivel del flujo, así que no necesitas especificar las columnas de los tipos de datos de objetos grandes.

En el siguiente ejemplo, se muestra cómo crear una transmisión que te permita transmitir en grandes objetos.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Para obtener más información sobre cómo usar gcloud para actualizar una transmisión, consulta la documentación del SDK de Google Cloud.

Borrar una transmisión

En el siguiente código, se muestra una solicitud para borrar una transmisión.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Si quieres obtener más información para usar gcloud para borrar tu transmisión, haz clic aquí.

¿Qué sigue?