Visão geral
Nesta seção, você aprenderá a usar a API Datastream para:
- Criar streams
- Receber informações sobre streams e objetos de stream
- Atualizar streams iniciando, pausando, retomando e modificando-os, bem como iniciando e interrompendo o preenchimento de objetos do stream
- Recuperar streams que falharam permanentemente
- Ativar o streaming de objetos grandes para streams do Oracle
- Excluir fluxos
Há duas maneiras de usar a API Datastream. É possível fazer chamadas de API REST ou usar a CLI (CLI) do Google Cloud.
Para informações gerais sobre como usar gcloud
para gerenciar streams do Datastream, consulte gcloud Datastream streams.
Criar um stream
Nesta seção, você vai aprender a criar um stream usado para transferir dados da origem para um destino. Os exemplos a seguir não são abrangentes, mas destacam características específicas do Datastream. Para lidar com seu caso de uso específico, use estes exemplos com a documentação de referência da API do Datastream.
Esta seção aborda os seguintes casos de uso:
- Faça streaming do Oracle para o Cloud Storage
- Fazer streaming do MySQL para o BigQuery
- Fazer streaming do PostgreSQL para o BigQuery
- Definir um conjunto de objetos para incluir no stream
- Preencha todos os objetos incluídos no stream
- Excluir objetos do fluxo
- Excluir objetos do preenchimento
- Definir a CMEK para criptografar dados em repouso
- Definir o modo de gravação para um stream
Exemplo 1: fazer streaming de objetos específicos para o BigQuery
Neste exemplo, você vai aprender a:
- Fazer streaming do MySQL para o BigQuery
- Incluir um conjunto de objetos no stream
- Definir o modo de gravação do stream como somente anexação
- Preencher todos os objetos incluídos no stream
Veja a seguir uma solicitação para extrair todas as tabelas de schema1
e duas tabelas
tabelas de schema2
: tableA
e tableC
. Os eventos são gravados em um conjunto de dados
no BigQuery.
A solicitação não inclui o parâmetro customerManagedEncryptionKey
. Portanto,
o sistema interno de gerenciamento de chaves do Google Cloud é usado para criptografar seus dados
em vez de CMEK.
O parâmetro backfillAll
associado à performance do histórico
(ou snapshot) está definido como um dicionário vazio ({}
), o que significa que
O Datastream preenche os dados históricos de todas as tabelas incluídas no stream.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Para mais informações sobre como usar o gcloud
para criar um stream, consulte a documentação do SDK Google Cloud.
Exemplo 2: excluir objetos específicos de um stream com uma origem PostgreSQL
Neste exemplo, você vai aprender a:
- Fazer streaming do PostgreSQL para o BigQuery
- Excluir objetos do fluxo
- Excluir objetos do preenchimento
O código a seguir mostra uma solicitação para criar um stream usado na transferência de dados de um banco de dados PostgreSQL de origem para o BigQuery. Ao criar um stream de um banco de dados PostgreSQL de origem, você precisa especificar mais dois campos específicos do PostgreSQL na solicitação:
replicationSlot
: um slot de replicação é um pré-requisito para configurar um banco de dados PostgreSQL para replicação. Você precisa criar um slot de replicação para cada stream.publication
: uma publicação é um grupo de tabelas de onde você quer replicar as mudanças. O nome da publicação precisa existir no banco de dados antes de iniciar um stream. No mínimo, a publicação precisa incluir as tabelas especificadas na listaincludeObjects
do stream.
O parâmetro backfillAll
associado à execução do preenchimento histórico (ou snapshot) está definido para excluir uma tabela.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Para mais informações sobre como usar o gcloud
para criar um stream, consulte a documentação do SDK Google Cloud.
Exemplo 3: especificar o modo de gravação somente de anexação para um stream
Ao fazer streaming para o BigQuery, é possível definir o modo de gravação: merge
ou
appendOnly
. Para mais informações, consulte Configurar o modo de gravação.
Se você não especificar o modo de gravação em sua solicitação para criar um stream, o padrão
O modo merge
é usado.
A solicitação a seguir mostra como definir o modo appendOnly
ao criar
um fluxo MySQL para BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Para mais informações sobre como usar gcloud
para criar um stream, consulte a documentação do SDK do Google Cloud.
Exemplo 4: fazer streaming para um destino do Cloud Storage
Neste exemplo, você vai aprender a:
- Fazer streaming do Oracle para o Cloud Storage
- Definir um conjunto de objetos a serem incluídos no stream
- Definir CMEK para criptografar dados em repouso
A solicitação a seguir mostra como criar um stream que grava os eventos em um bucket do Cloud Storage.
Nesta solicitação de exemplo, os eventos são gravados no formato de saída JSON, e um novo arquivo é criado a cada 100 MB ou 30 segundos, substituindo os valores padrão de 50 MB e 60 segundos.
Para o formato JSON, é possível:
Inclua um arquivo de esquema de tipos unificados no caminho. Como resultado, o Datastream grava dois arquivos no Cloud Storage: um arquivo de dados JSON e um arquivo de esquema Avro. O arquivo de esquema tem o mesmo nome do arquivo de dados, com uma extensão
.schema
.Ative a compactação gzip para que o Datastream compacte os arquivos gravados no Cloud Storage.
Com o uso do parâmetro backfillNone
, a solicitação especifica que somente mudanças em andamento são transmitidas para o destino, sem preenchimento.
A solicitação especifica o parâmetro de chave de criptografia gerenciada pelo cliente, que permite controlar as chaves usadas para criptografar dados em repouso em um projeto do Google Cloud. O parâmetro se refere à CMEK que o Datastream usa para criptografar dados transmitidos da origem para o destino. Ele também especifica o keyring da CMEK.
Para mais informações sobre keyrings, consulte Recursos do Cloud KMS. Para mais informações sobre como proteger seus dados usando chaves de criptografia, consulte Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Para mais informações sobre como usar o gcloud
para criar um stream, consulte a documentação do SDK Google Cloud.
Validar a definição de um stream
Antes de criar um stream, é possível validar a definição dele. Dessa forma, é possível garantir que todas as verificações de validação sejam aprovadas e que o stream seja executado com êxito quando criado.
Como validar verificações de stream:
- Indica se a fonte está configurada corretamente para permitir que o Datastream faça streaming de dados.
- Se a transmissão pode se conectar à origem e ao destino.
- A configuração completa do stream.
Para validar um stream, adicione &validate_only=true
ao URL anterior ao corpo da sua solicitação:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Depois de fazer essa solicitação, você verá as verificações de validação que o Datastream executa para sua origem e seu destino e se as verificações serão aprovadas ou falharão. Para qualquer verificação de validação que não seja aprovada, serão exibidas informações sobre o motivo da falha e o que fazer para corrigir o problema.
Por exemplo, suponha que você tenha uma chave de criptografia gerenciada pelo cliente (CMEK) e queira que o Datastream use para criptografar dados transmitidos da origem para o destino. Como parte da validação do stream, o Datastream verifica se a chave existe e se tem permissões para usá-la. Se uma dessas condições não for atendida, quando você validar o fluxo, a seguinte mensagem de erro será retornada:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Para resolver esse problema, verifique se a chave fornecida existe e se a conta de serviço do Datastream tem a permissão cloudkms.cryptoKeys.get
para a chave.
Depois de fazer as correções apropriadas, faça a solicitação novamente para garantir que todas as verificações de validação sejam aprovadas. Para o exemplo acima, a verificação CMEK_VALIDATE_PERMISSIONS
não retornará mais uma mensagem de erro, mas terá o status PASSED
.
Receber informações sobre um fluxo
O código a seguir mostra uma solicitação para recuperar informações sobre um stream. Essas informações incluem:
- O nome do stream (identificador exclusivo)
- Um nome fácil de usar para o fluxo (nome de exibição)
- Carimbos de data/hora de quando o stream foi criado e atualizado pela última vez
- Informações sobre os perfis de conexão de origem e de destino associados ao stream
- O estado do stream
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
A resposta é exibida da seguinte maneira:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Se quiser saber como usar o gcloud
para recuperar informações sobre sua transmissão, clique aqui.
Listar fluxos
O código a seguir mostra uma solicitação para recuperar uma lista de todos os streams na projeto e local especificados.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Se quiser saber como usar o gcloud
para recuperar informações de todos os seus streams, clique aqui.
Listar objetos de um fluxo
O código a seguir mostra uma solicitação para recuperar informações sobre todos os objetos de um stream.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Se quiser saber como usar o gcloud
para recuperar informações sobre todos os objetos do seu stream, clique aqui.
A lista de objetos retornada pode ser semelhante a esta:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Se quiser mais informações sobre como usar gcloud
para listar objetos de um stream, clique aqui.
Iniciar um stream
O código a seguir mostra uma solicitação para iniciar um fluxo.
Usando o parâmetro updateMask
na solicitação, apenas os campos que você especificar precisam ser incluídos no corpo da solicitação. Para iniciar um stream, altere o valor no campo state
de CREATED
para RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Se quiser mais informações sobre como usar o gcloud
para iniciar sua transmissão, clique aqui.
Pausar um stream
O código a seguir mostra uma solicitação para pausar um fluxo em execução.
Neste exemplo, o campo especificado no parâmetro updateMask
é state
. Ao pausar o stream, você muda o estado dele de RUNNING
para PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Para mais informações sobre como usar o gcloud
para pausar sua transmissão, clique aqui.
Retomar um stream
O código a seguir mostra uma solicitação para retomar um stream pausado.
Neste exemplo, o campo especificado no parâmetro updateMask
é state
. Ao retomar o fluxo, você muda o estado dele de PAUSED
de volta para RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Clique aqui para mais informações sobre como usar o gcloud
para retomar sua transmissão.
Recuperar uma transmissão
Recupere uma transmissão que falhou permanentemente usando o método RunStream
. Cada
tipo de banco de dados de origem tem a própria definição de quais operações de recuperação de stream
são possíveis. Para mais informações, consulte Recuperar uma transmissão.
Recuperar um stream de uma origem MySQL ou Oracle
Os exemplos de código a seguir mostram solicitações para recuperar um fluxo para MySQL ou Origem do Oracle em várias posições de arquivos de registros:
REST
Recuperar um stream da posição atual. Esta é a opção padrão:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Recuperar um stream na próxima posição disponível:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Recupere um stream da posição mais recente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Recupere um stream de uma posição específica (MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Substitua:
- NAME_OF_THE_LOG_FILE: o nome do arquivo de registros que você quer usar para recuperar sua transmissão
- POSITION: a posição no arquivo de registros a partir da qual você quer recuperar sua transmissão. Se você não informar o valor, o Datastream vai recuperar o stream do cabeçalho do arquivo.
Exemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Recuperar um stream de uma posição específica (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Exemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Para mais informações sobre as opções de recuperação disponíveis, consulte Recuperar uma transmissão.
gcloud
Não é possível recuperar um stream usando o gcloud
.
Recuperar um stream de uma origem do PostgreSQL
O exemplo de código a seguir mostra uma solicitação para recuperar um stream para um PostgreSQL fonte. Durante a recuperação, o stream começa a ler a partir da primeira sequência de registro (LSN) no slot de replicação configurado para o stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Exemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Se você quiser mudar o slot de replicação, atualize o fluxo com o novo nome do slot de replicação primeiro:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
Não é possível recuperar um stream usando o gcloud
.
Recuperar um stream de uma origem do SQL Server
Os exemplos de código a seguir mostram exemplos de solicitações para recuperar um stream de uma solicitação Origem do servidor.
REST
Para recuperar um stream na primeira posição disponível:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Exemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Recupere um fluxo de um número de sequência de registro preferido:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Exemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
Não é possível recuperar um fluxo usando gcloud
.
Iniciar ou retomar uma transmissão em uma posição específica
É possível iniciar uma transmissão ou retomar uma pausa em uma posição específica por Origens MySQL e Oracle. Isso pode ser útil quando você quer fazer o preenchimento usando uma ferramenta externa ou iniciar o CDC em uma posição indicada. Para uma origem do MySQL, é necessário indicar uma posição de binário. Para uma origem do Oracle, um número de alteração do sistema (SCN) no arquivo de registro de repetição.
O código a seguir mostra uma solicitação para iniciar ou retomar um stream já criado de uma posição específica.
Inicie ou retome um stream em uma posição específica do binlog (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Substitua:
- NAME_OF_THE_LOG_FILE: o nome do arquivo de registros que você quer usar para iniciar sua transmissão.
- POSITION: a posição no arquivo de registro em que você quer iniciar o fluxo. Se você não informar o valor, o Datastream vai começar a ler do cabeçalho do arquivo.
Exemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
Não é possível iniciar ou retomar um stream de uma posição específica usando gcloud
. Para informações sobre como usar o gcloud
para iniciar ou retomar um stream, consulte a documentação do SDK Cloud.
Inicie ou retome um stream a partir de um número de alteração específico do sistema no arquivo de registro redo (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Exemplo:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
Não é possível iniciar ou retomar um stream de uma posição específica usando gcloud
. Para mais informações sobre como usar o gcloud
para iniciar um stream, consulte a documentação do SDK Cloud.
Modificar um stream
O código a seguir mostra uma solicitação para atualizar a configuração de rotação de arquivos de um fluxo para alterná-lo a cada 75 MB ou 45 segundos.
Neste exemplo, os campos especificados para o parâmetro updateMask
incluem os campos fileRotationMb
e fileRotationInterval
, representados pelas sinalizações destinationConfig.gcsDestinationConfig.fileRotationMb
e destinationConfig.gcsDestinationConfig.fileRotationInterval
, respectivamente.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
O código a seguir mostra uma solicitação para incluir um arquivo de esquema de tipos unificados no caminho dos arquivos que o Datastream grava no Cloud Storage. Como resultado, o Datastream grava dois arquivos: um arquivo de dados JSON e um arquivo de esquema Avro.
Neste exemplo, o campo especificado é jsonFileFormat
, representado pela sinalização destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
O código a seguir mostra uma solicitação para que o Datastream replique dados existentes, além de alterações contínuas nos dados, do banco de dados de origem para o destino.
A seção oracleExcludedObjects
do código mostra as tabelas e esquemas que não podem ser preenchidos no destino.
Neste exemplo, todas as tabelas e esquemas serão preenchidos, exceto a tableA no schema3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Se quiser mais informações sobre como usar o gcloud
para modificar sua transmissão, clique aqui.
Iniciar o preenchimento de um objeto de um fluxo
Um stream no Datastream pode preencher dados históricos e fazer streaming de alterações em andamento para um destino. As alterações em andamento serão sempre transmitidas de uma origem para um destino. No entanto, é possível especificar se você quer que os dados históricos sejam transmitidos.
Se você quiser que os dados históricos sejam transmitidos da origem para o destino, use o parâmetro backfillAll
.
O Datastream também permite transmitir dados históricos apenas para tabelas de banco de dados específicas. Para fazer isso, use o parâmetro backfillAll
e exclua as tabelas para as quais você não quer dados históricos.
Se você quiser que apenas as alterações em andamento sejam transmitidas no destino, use o parâmetro backfillNone
. Se você quiser que o Datastream transmita um snapshot de todos os dados existentes da origem para o destino, inicie o preenchimento manualmente para os objetos que contêm esses dados.
Outro motivo para iniciar o preenchimento de um objeto é se os dados não estiverem sincronizados entre a origem e o destino. Por exemplo, um usuário pode excluir dados no destino acidentalmente, e os dados serão perdidos. Nesse caso, o preenchimento do objeto será iniciado como um "mecanismo de redefinição", porque todos os dados são transmitidos para o destino em uma única captura. Como resultado, os dados são sincronizados entre a origem e o destino.
Antes de iniciar o preenchimento de um objeto de um stream, você precisa recuperar informações sobre o objeto.
Cada objeto tem um OBJECT_ID, que identifica o objeto de maneira exclusiva. Use o OBJECT_ID para iniciar o preenchimento do stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Para mais informações sobre como usar gcloud
para iniciar o preenchimento de um objeto de stream, consulte a documentação do SDK Google Cloud.
Interromper o preenchimento de um objeto de um fluxo
Depois de iniciar o preenchimento de um objeto de um stream, você pode interromper o preenchimento do objeto. Por exemplo, se um usuário modificar um esquema de banco de dados, o esquema ou os dados poderão estar corrompidos. Você não quer que o esquema ou os dados sejam transmitidos para o destino, então o preenchimento do objeto é interrompido.
Também é possível interromper o preenchimento de um objeto para fins de balanceamento de carga. O Datastream pode executar vários preenchimentos em paralelo. Isso pode sobrecarregar a origem. Se a carga for significativa, interrompa o preenchimento de cada objeto e, em seguida, inicie o preenchimento para os objetos, um por um.
Antes de interromper o preenchimento de um objeto de um stream, é preciso fazer uma solicitação para recuperar informações sobre todos os objetos de um stream. Cada objeto retornado tem um OBJECT_ID, que identifica o objeto de maneira exclusiva. Use o OBJECT_ID para interromper o preenchimento do stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Para mais informações sobre como usar gcloud
para interromper o preenchimento de um objeto do seu stream, clique aqui.
Alterar o número máximo de tarefas CDC simultâneas
O código a seguir mostra como definir o número máximo de tarefas captura de dados alterados simultâneos (CDC, na sigla em inglês) para um stream do MySQL como 7.
Neste exemplo, o campo especificado no parâmetro updateMask
é maxConcurrentCdcTasks
. Ao definir o valor como 7, você muda o número máximo de tarefas de CDC simultâneas do valor anterior para 7. É possível usar valores de 0 a 50 (inclusive). Se você não definir o valor ou se definir como 0, o padrão do sistema de cinco tarefas será definido para o stream.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Para mais informações sobre como usar o gcloud
, clique aqui.
Alterar o número máximo de tarefas de preenchimento simultâneas
O código a seguir mostra como definir o número máximo de tarefas de preenchimento simultâneas para um fluxo do MySQL como 25.
Neste exemplo, o campo especificado no parâmetro updateMask
é maxConcurrentBackfillTasks
. Ao definir o valor como 25, você altera o número máximo de tarefas de preenchimento simultâneas do valor anterior para 25. Você pode usar valores de 0 a 50 (inclusive). Se você não definir o valor ou definir como 0, o padrão do sistema de 16 tarefas será definido para o stream.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Para mais informações sobre como usar o gcloud
, clique aqui.
Ativar streaming de objetos grandes para fontes Oracle
É possível ativar o streaming de objetos grandes, como objetos binários grandes (BLOB
),
objetos grandes de caracteres (CLOB
) e objetos grandes de caracteres nacionais (NCLOB
)
para streams com origens Oracle. A flag streamLargeObjects
permite incluir
objetos grandes em streams novos e existentes. A flag é definida no nível do stream,
não é preciso especificar as colunas de tipos de dados de objetos grandes.
O exemplo a seguir mostra como criar um stream que permite fazer transmissões objetos.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Para mais informações sobre como usar gcloud
para atualizar um stream, consulte a documentação do SDK Google Cloud.
Excluir um stream
O código a seguir mostra uma solicitação para excluir um stream.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Para mais informações sobre como usar o gcloud
para excluir sua transmissão, clique aqui.
A seguir
- Saiba como usar a API Datastream para gerenciar perfis de conexão.
- Saiba como usar a API Datastream para gerenciar configurações de conectividade particular.
- Para mais informações sobre como usar a API Datastream, consulte a documentação de referência.