In questa pagina scoprirai come utilizzare l'API Datastream per:
- Creare stream
- Visualizzare informazioni su stream e oggetti stream
- Aggiornare gli stream avviandoli, mettendoli in pausa, riprendendoli e modificandoli, nonché avviando e interrompendo il backfill per gli oggetti stream
- Recuperare gli stream non riusciti definitivamente
- Attivare lo streaming di oggetti di grandi dimensioni per gli stream Oracle
- Eliminare gli stream
Puoi utilizzare l'API Datastream in due modi. Puoi effettuare chiamate all'API REST o utilizzare Google Cloud CLI (interfaccia a riga di comando).
Per informazioni di alto livello sull'utilizzo di gcloud
per gestire gli stream di Datastream, consulta Stream di gcloud Datastream.
Creazione di un flusso
In questa sezione imparerai a creare uno stream utilizzato per trasferire i dati dall'origine a una destinazione. Gli esempi riportati di seguito non sono esaustivi, ma mettono in evidenza funzionalità specifiche di Datastream. Per risolvere il tuo caso d'uso specifico, utilizza questi esempi insieme alla documentazione di riferimento dell'API Datastream.
Questa sezione illustra i seguenti casi d'uso:
- Riproduci in streaming da Oracle a Cloud Storage
- Eseguire lo streaming da MySQL a BigQuery
- Eseguire lo streaming da PostgreSQL a BigQuery
- Definire un insieme di oggetti da includere nello stream
- Eseguire il backfill di tutti gli oggetti inclusi nello stream
- Escludi oggetti dallo stream
- Escludi oggetti dal backfill
- Definire la chiave CMEK per la crittografia dei dati at-rest
- Definire la modalità di scrittura per uno stream
Esempio 1: trasmetti oggetti specifici in streaming a BigQuery
In questo esempio imparerai a:
- Esegui il flusso da MySQL a BigQuery
- Includi un insieme di oggetti nello stream
- Definisci la modalità di scrittura per lo stream come di sola aggiunta
- Esegui il backfill di tutti gli oggetti inclusi nello stream
Di seguito è riportata una richiesta di estrazione di tutte le tabelle da schema1
e di due tabelle specifiche da schema2
: tableA
e tableC
. Gli eventi vengono scritti in un set di dati
in BigQuery.
La richiesta non include il parametro customerManagedEncryptionKey
, pertanto per criptare i dati viene utilizzato il sistema di gestione delle chiavi interno di Google Cloud anziché CMEK.
Il parametro backfillAll
associato all'esecuzione del backfill (o dello snapshot) storico è impostato su un dizionario vuoto ({}
), il che significa che Datastream esegue il backfill dei dati storici di tutte le tabelle incluse nello stream.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 2: escludi oggetti specifici da uno stream con un'origine PostgreSQL
In questo esempio imparerai a:
- Esegui il flusso da PostgreSQL a BigQuery
- Escludere oggetti dallo stream
- Escludere oggetti dal backfill
Il codice seguente mostra una richiesta di creazione di uno stream utilizzato per trasferire i dati da un database PostgreSQL di origine a BigQuery. Quando crei uno stream da un database PostgreSQL di origine, devi specificare nella richiesta altri due campi specifici di PostgreSQL:
replicationSlot
: uno slot di replica è un prerequisito per la configurazione di un database PostgreSQL per la replica. Devi creare uno slot di replica per ogni stream.publication
: una pubblicazione è un gruppo di tabelle di cui vuoi replicare le modifiche. Il nome della pubblicazione deve esistere nel database prima di avviare uno stream. La pubblicazione deve includere almeno le tabelle specificate nell'elencoincludeObjects
dello stream.
Il parametro backfillAll
associato all'esecuzione del backfill storico (o dello snapshot) è impostato per escludere una tabella.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 3: specifica la modalità di scrittura di sola aggiunta per uno stream
Quando esegui lo streaming in BigQuery, puoi definire la modalità di scrittura: merge
o
appendOnly
. Per ulteriori informazioni, consulta Configurare la modalità di scrittura.
Se non specifichi la modalità di scrittura nella richiesta per creare uno stream, viene utilizzata la modalità predefinitamerge
.
La richiesta seguente mostra come definire la modalità appendOnly
quando crei un flusso da MySQL a BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 4: streaming in una destinazione Cloud Storage
In questo esempio imparerai a:
- Stream da Oracle a Cloud Storage
- Definisci un insieme di oggetti da includere nello stream
- Definire la chiave CMEK per la crittografia dei dati at-rest
La richiesta seguente mostra come creare uno stream che scrive gli eventi in un bucket in Cloud Storage.
In questa richiesta di esempio, gli eventi vengono scritti nel formato di output JSON e viene creato un nuovo file ogni 100 MB o 30 secondi (sostituendo i valori predefiniti di 50 MB e 60 secondi).
Per il formato JSON, puoi:
Includi un file di schema di tipi unificati nel percorso. Di conseguenza, Datastream scrive due file in Cloud Storage: un file di dati JSON e un file di schema Avro. Il file dello schema ha lo stesso nome del file di dati, con estensione
.schema
.Attiva la compressione gzip per consentire a Datastream di comprimere i file scritti in Cloud Storage.
Se utilizzi il parametro backfillNone
, la richiesta specifica che solo le modifiche in corso vengono importate nella destinazione in streaming, senza backfill.
La richiesta specifica il parametro della chiave di crittografia gestita dal cliente che ti consente di controllare le chiavi utilizzate per criptare i dati inattivi all'interno di un progetto Google Cloud. Il parametro si riferisce al CMEK utilizzato da Datastream per criptare i dati trasmessi in streaming dall'origine alla destinazione. Specifica anche il keyring per la chiave CMEK.
Per ulteriori informazioni sui keyring, consulta le risorse Cloud KMS. Per ulteriori informazioni sulla protezione dei dati mediante chiavi di crittografia, consulta Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Convalida la definizione di uno stream
Prima di creare uno stream, puoi convalidarne la definizione. In questo modo, puoi assicurarti che tutti i controlli di convalida vengano superati e che lo stream venga eseguito correttamente al momento della creazione.
La convalida di uno stream verifica:
- Indica se l'origine è configurata correttamente per consentire a Datastream di trasmettere i dati.
- Indica se lo stream può connettersi sia all'origine che alla destinazione.
- La configurazione end-to-end dello stream.
Per convalidare uno stream, aggiungi &validate_only=true
all'URL che precede il corpo della richiesta:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Dopo aver effettuato questa richiesta, vedrai i controlli di convalida eseguiti da Datastream per l'origine e la destinazione, nonché se i controlli sono stati superati o meno. Per qualsiasi controllo di convalida non superato, vengono visualizzate informazioni sul motivo dell'errore e su cosa fare per correggere il problema.
Ad esempio, supponiamo che tu abbia una chiave di crittografia gestita dal cliente (CMEK) che vuoi che Datastream utilizzi per criptare i dati in streaming dall'origine alla destinazione. Nell'ambito della convalida dello stream, Datastream verificherà che la chiave esista e che Datastream disponga delle autorizzazioni per utilizzarla. Se una di queste condizioni non viene soddisfatta, quando convalidi lo stream viene restituito il seguente messaggio di errore:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Per risolvere il problema, verifica che la chiave che hai fornito esista e che l'account di servizio Datastream disponga dell'autorizzazione cloudkms.cryptoKeys.get
per la chiave.
Dopo aver apportato le correzioni del caso, invia di nuovo la richiesta per assicurarti che tutti i controlli di convalida vengano superati. Nell'esempio precedente, il controllo CMEK_VALIDATE_PERMISSIONS
non restituirà più un messaggio di errore, ma avrà uno stato PASSED
.
Ricevere informazioni su uno stream
Il seguente codice mostra una richiesta per recuperare informazioni su uno stream. Queste informazioni comprendono:
- Il nome dello stream (identificatore univoco)
- Un nome intuitivo per lo stream (nome visualizzato)
- Timestamp della creazione e dell'ultimo aggiornamento dello stream
- Informazioni sui profili di connessione di origine e di destinazione associati allo stream
- Lo stato dello stream
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
Viene visualizzata la risposta, come segue:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per recuperare informazioni sul tuo stream, fai clic qui.
Elenco stream
Il codice seguente mostra una richiesta per recuperare un elenco di tutti gli stream nel progetto e nella località specificati.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per recuperare informazioni su tutti i tuoi stream, fai clic qui.
Elenca gli oggetti di uno stream
Il codice seguente mostra una richiesta per recuperare informazioni su tutti gli oggetti di uno stream.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per recuperare informazioni su tutti gli oggetti dello stream, fai clic qui.
L'elenco di oggetti restituiti potrebbe essere simile al seguente:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per elencare gli oggetti di uno stream, fai clic qui.
Avvio di un flusso
Il seguente codice mostra una richiesta di avvio di uno stream.
Se utilizzi il parametro updateMask
nella richiesta, nel corpo della richiesta devono essere inclusi solo i campi specificati. Per avviare uno stream, modifica il valore nel campo state
da CREATED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per avviare lo stream, fai clic qui.
Pausa di un flusso
Il seguente codice mostra una richiesta di messa in pausa di uno stream in esecuzione.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo state
. Mettendo in pausa lo stream, ne cambi lo stato da RUNNING
a PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per mettere in pausa lo stream, fai clic qui.
Ripresa di un flusso
Il seguente codice mostra una richiesta di ripresa di uno stream in pausa.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo state
. Se riprendi lo stream, ne cambi lo stato da PAUSED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per riprendere lo stream, fai clic qui.
Recuperare uno stream
Puoi recuperare uno stream con errore permanente utilizzando il metodo RunStream
. Ogni
tipo di database di origine ha la propria definizione delle operazioni di recupero dello stream
possibili. Per ulteriori informazioni, vedi Ripristinare uno stream.
Recuperare uno stream per un'origine MySQL o Oracle
I seguenti esempi di codice mostrano le richieste per recuperare uno stream per un'origine MySQL o Oracle da varie posizioni del file di log:
REST
Recupera uno stream dalla posizione corrente. Questa è l'opzione predefinita:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Recuperare uno stream dalla posizione disponibile successiva:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Recuperare uno stream dalla posizione più recente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Recuperare uno stream da una posizione specifica (MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Sostituisci quanto segue:
- NAME_OF_THE_LOG_FILE: il nome del file log da cui vuoi recuperare lo stream
- POSITION: la posizione nel file log da cui vuoi recuperare lo stream. Se non fornisci il valore, Datastream recupera lo stream dall'inizio del file.
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Recuperare uno stream da una posizione specifica (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Per ulteriori informazioni sulle opzioni di recupero disponibili, consulta Recuperare uno stream.
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Recuperare uno stream per un'origine PostgreSQL
Il seguente esempio di codice mostra una richiesta di recupero di uno stream per un'origine PostgreSQL. Durante il recupero, lo stream inizia a leggere dal primo numero di sequenza di log (LSN) nello slot di replica configurato per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Se vuoi modificare lo slot di replica, aggiorna prima lo stream con il nuovo nome dello slot di replica:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Recuperare uno stream per un'origine SQL Server
I seguenti esempi di codice mostrano richieste di esempio per recuperare uno stream per un'origine SQL Server.
REST
Ripristina uno stream dalla prima posizione disponibile:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Recupera uno stream da un numero di sequenza di log preferito:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Avviare o riprendere uno stream da una posizione specifica
Puoi avviare uno stream o riprendere uno stream in pausa da una posizione specifica per le origini MySQL e Oracle. Questa operazione può essere utile quando vuoi eseguire il backfill utilizzando uno strumento esterno o avviare la CDC da una posizione da te indicata. Per un'origine MySQL, devi indicare una posizione del log binario, per un'origine Oracle, un numero della modifica di sistema (SCN) nel file del log di ripristino.
Il codice seguente mostra una richiesta di avvio o ripresa di uno stream già creato da una posizione specifica.
Avvia o riprendi uno stream da una posizione del file binlog specifica (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Sostituisci quanto segue:
- NAME_OF_THE_LOG_FILE: il nome del file log da cui vuoi avviare lo stream.
- POSITION: la posizione nel file di log da cui vuoi avviare lo stream. Se non fornisci il valore, Datastream inizia a leggere dall'inizio del file.
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud
non è supportato. Per informazioni sull'utilizzo di gcloud
per avviare o riprendere uno stream, consulta la documentazione di Cloud SDK.
Avvia o riprendi uno stream da un numero della modifica di sistema specifico nel file di log di ripetizione (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud
non è supportato. Per informazioni sull'utilizzo di gcloud
per avviare uno stream, consulta la documentazione di Cloud SDK.
Modifica di un flusso
Il codice seguente mostra una richiesta di aggiornamento della configurazione della rotazione dei file di uno stream per ruotare il file ogni 75 MB o 45 secondi.
Per questo esempio, i campi specificati per il parametro updateMask
includono i campi fileRotationMb
e fileRotationInterval
, rappresentati rispettivamente dai flag destinationConfig.gcsDestinationConfig.fileRotationMb
e destinationConfig.gcsDestinationConfig.fileRotationInterval
.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Il codice seguente mostra una richiesta di inclusione di un file di schema di tipi unificati nel percorso dei file che Datastream scrive in Cloud Storage. Di conseguenza, Datastream scrive due file: un file di dati JSON e un file di schema Avro.
Per questo esempio, il campo specificato è il campo jsonFileFormat
, rappresentato dal flag destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Il codice seguente mostra una richiesta a Datastream di replicare i dati esistenti, oltre alle modifiche in corso ai dati, dal database di origine a quello di destinazione.
La sezione oracleExcludedObjects
del codice mostra le tabelle e gli schemi di cui è vietato il backfill nella destinazione.
Per questo esempio, verrà eseguito il backfill di tutte le tabelle e di tutti gli schemi, ad eccezione della tabella A nello schema 3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per modificare lo stream, fai clic qui.
Avvia il backfill per un oggetto di uno stream
Uno stream in Datastream può eseguire il backfill dei dati storici, nonché trasmettere le modifiche in corso in una destinazione. Le modifiche in corso verranno sempre trasmesse in streaming da un'origine a una destinazione. Tuttavia, puoi specificare se vuoi che i dati storici vengano trasmessi in streaming.
Se vuoi che i dati storici vengano trasmessi in streaming dall'origine alla destinazione, utilizza il parametro backfillAll
.
Datastream ti consente anche di eseguire lo streaming dei dati storici solo per tabelle di database specifiche. A questo scopo, utilizza il parametro backfillAll
ed escludi le tabelle per le quali non vuoi i dati storici.
Se vuoi che solo le modifiche in corso vengano trasmesse in streaming alla destinazione, utilizza il parametro backfillNone
. Se vuoi che Datastream trasmetta un flusso di uno snapshot di tutti i dati esistenti dall'origine alla destinazione, devi avviare manualmente il backfill per gli oggetti che contengono questi dati.
Un altro motivo per avviare il backfill di un oggetto è se i dati non sono sincronizzati tra l'origine e la destinazione. Ad esempio, un utente può eliminare inavvertitamente i dati nella destinazione, che vengono quindi persi. In questo caso, l'avvio del backfill per l'oggetto funge da "meccanismo di reimpostazione" perché tutti i dati vengono trasmessi in streaming alla destinazione in un solo passaggio. Di conseguenza, i dati vengono sincronizzati tra l'origine e la destinazione.
Prima di poter avviare il backfill per un oggetto di uno stream, devi recuperare le informazioni sull'oggetto.
Ogni oggetto ha un OBJECT_ID, che lo identifica in modo univoco. Utilizza OBJECT_ID per avviare il backfill per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per avviare il backfill di un oggetto dello stream, consulta la documentazione di Google Cloud SDK.
Interrompere il backfill per un oggetto di uno stream
Dopo aver avviato il backfill per un oggetto di uno stream, puoi interromperlo. Ad esempio, se un utente modifica lo schema di un database, lo schema o i dati potrebbero essere danneggiati. Non vuoi che questo schema o questi dati vengano trasmessi in streaming alla destinazione, quindi interrompi il backfill per l'oggetto.
Puoi anche interrompere il backfill di un oggetto per il bilanciamento del carico. Datastream può eseguire più backfill in parallelo. Questa operazione potrebbe applicare un carico supplementare sull'origine. Se il carico è elevato, interrompi il backfill per ogni oggetto e poi avvialo per gli oggetti uno alla volta.
Prima di poter interrompere il backfill per un oggetto di uno stream, devi inviare una richiesta per recuperare le informazioni su tutti gli oggetti di uno stream. Ogni oggetto restituito ha un OBJECT_ID, che lo identifica in modo univoco. Utilizza OBJECT_ID per interrompere il backfill per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per interrompere il backfill di un oggetto dello stream, fai clic qui.
Modificare il numero massimo di attività CDC simultanee
Il seguente codice mostra come impostare il numero massimo di attività CDC (Change Data Capture) concorrenti per uno stream MySQL su 7.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo maxConcurrentCdcTasks
. Impostando il valore su 7, modifichi il numero di attività CDC contemporanee massime dal valore precedente a 7. Puoi utilizzare valori compresi tra 0 e 50 (incluso). Se non definisci il valore o lo definisci come 0, per lo stream viene impostato il valore predefinito del sistema di 5 attività.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
, fai clic qui.
Modificare il numero massimo di attività di backfill simultanee
Il seguente codice mostra come impostare il numero massimo di attività di backfill contemporanee per uno stream MySQL su 25.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo maxConcurrentBackfillTasks
. Impostando il valore su 25, modifichi il numero di attività di backfill contemporanee massime dal valore precedente a 25. Puoi utilizzare valori compresi tra 0 e 50 (incluso). Se non definisci il valore o lo definisci come 0, per lo stream viene impostato il valore predefinito del sistema di 16 attività.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
, fai clic qui.
Attivare lo streaming di oggetti di grandi dimensioni per le origini Oracle
Puoi attivare lo streaming di oggetti di grandi dimensioni, come oggetti di grandi dimensioni binari (BLOB
),
oggetti di grandi dimensioni con caratteri (CLOB
) e oggetti di grandi dimensioni con caratteri nazionali (NCLOB
)
per gli stream con origini Oracle. Il flag streamLargeObjects
ti consente di includere oggetti di grandi dimensioni sia negli stream nuovi che esistenti. Il flag viene impostato a livello di stream,
non è necessario specificare le colonne dei tipi di dati di oggetti di grandi dimensioni.
L'esempio seguente mostra come creare uno stream che ti consenta di trasmettere oggetti di grandi dimensioni.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per aggiornare uno stream, consulta la documentazione di Google Cloud SDK.
Eliminazione di un flusso
Il seguente codice mostra una richiesta di eliminazione di uno stream.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per eliminare lo stream, fai clic qui.
Passaggi successivi
- Scopri come utilizzare l'API Datastream per gestire i profili di connessione.
- Scopri come utilizzare l'API Datastream per gestire le configurazioni di connettività privata.
- Per ulteriori informazioni sull'utilizzo dell'API Datastream, consulta la documentazione di riferimento.