Panoramica
In questa sezione imparerai a utilizzare l'API Datastream per:
- Crea flussi
- Recupero informazioni su flussi e oggetti flusso
- Aggiorna i flussi avviandoli, mettendoli in pausa, ripristinandoli e modificandoli, nonché avviando e interrompendo il backfill per gli oggetti dei flussi
- Recupera gli stream non riusciti definitivamente
- Abilita flusso di oggetti di grandi dimensioni per i flussi Oracle
- Elimina flussi
Puoi utilizzare l'API Datastream in due modi. Puoi effettuare chiamate API REST o puoi utilizzare Google Cloud CLI (CLI).
Per informazioni generali sull'utilizzo di gcloud
per gestire i flussi di dati di Datastream, consulta gcloud Datastream Stream.
Creazione di un flusso
In questa sezione imparerai a creare uno stream utilizzato per trasferire i dati dall'origine a una destinazione. Gli esempi riportati di seguito non sono esaustivi, ma mettono in evidenza funzionalità specifiche di Datastream. Per gestire il tuo caso d'uso specifico, utilizza questi esempi insieme alla documentazione di riferimento API di Datastream.
Questa sezione riguarda i seguenti casi d'uso:
- Flusso di dati da Oracle a Cloud Storage
- Flusso di dati da MySQL a BigQuery
- Flusso di dati da PostgreSQL a BigQuery
- Definisci un insieme di oggetti da includere nello stream
- Esegui il backfill di tutti gli oggetti inclusi nello stream
- Escludi oggetti dallo stream
- Escludere gli oggetti dal backfill
- Definisci una CMEK per la crittografia dei dati at-rest
- Definire la modalità di scrittura per un flusso
Esempio 1: trasmettere oggetti specifici in BigQuery
In questo esempio, imparerai a:
- Trasmissione di flussi da MySQL a BigQuery
- Includi un insieme di oggetti nel flusso
- Definisci la modalità di scrittura per il flusso come solo aggiunta
- Esegui il backfill di tutti gli oggetti inclusi nel flusso
Di seguito è riportata una richiesta per eseguire il pull di tutte le tabelle da schema1
e di due tabelle specifiche
tabelle da schema2
: tableA
e tableC
. Gli eventi vengono scritti in un set di dati
in BigQuery.
La richiesta non include il parametro customerManagedEncryptionKey
, pertanto
il sistema di gestione delle chiavi interno di Google Cloud viene utilizzato per criptare i dati
anziché CMEK.
Il parametro backfillAll
associato all'esecuzione dello storico
il backfill (o snapshot) è impostato su un dizionario vuoto ({}
), il che significa che
Datastream esegue il backfill dei dati storici da tutte le tabelle incluse nel flusso.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 2: escludere oggetti specifici da un flusso con un'origine PostgreSQL
In questo esempio, imparerai a:
- Trasmetti il flusso da PostgreSQL a BigQuery
- Escludi oggetti dal flusso
- Escludere oggetti dal backfill
Il codice seguente mostra una richiesta di creazione di uno stream utilizzato per trasferire i dati da un database PostgreSQL di origine a BigQuery. Quando crei un flusso da un database PostgreSQL di origine, nella richiesta devi specificare due campi aggiuntivi specifici per PostgreSQL:
replicationSlot
: uno slot di replica è un prerequisito per la configurazione di un database PostgreSQL per la replica. Devi creare uno slot di replica per ogni flusso.publication
: una pubblicazione è un gruppo di tabelle da cui vuoi replicare le modifiche. Il nome della pubblicazione deve esistere nel database prima di avviare un flusso. La pubblicazione deve includere almeno le tabelle specificate nell'elencoincludeObjects
del flusso.
Il parametro backfillAll
associato all'esecuzione del backfill (o snapshot) storico è impostato per escludere una tabella.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 3: specifica la modalità di scrittura di sola aggiunta per un flusso
Quando esegui un flusso di dati in BigQuery, puoi definire la modalità di scrittura: merge
oppure
appendOnly
. Per ulteriori informazioni, vedi Configurare la modalità di scrittura.
Se non specifichi la modalità di scrittura nella richiesta per creare uno stream, viene utilizzata la modalità predefinitamerge
.
La richiesta seguente mostra come definire la modalità appendOnly
quando crei un stream da MySQL a BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 4: inserimento di flussi in una destinazione Cloud Storage
In questo esempio, imparerai a:
- Trasmetti il flusso da Oracle a Cloud Storage
- Definisci un insieme di oggetti da includere nello stream
- Definire la chiave CMEK per la crittografia dei dati at-rest
La richiesta seguente mostra come creare un flusso che scriva gli eventi in un bucket in Cloud Storage.
In questa richiesta di esempio, gli eventi vengono scritti nel formato di output JSON e viene creato un nuovo file ogni 100 MB o 30 secondi (esegue l'override dei valori predefiniti di 50 MB e 60 secondi).
Per il formato JSON, puoi:
Includi nel percorso un file di schema dei tipi unificati. Di conseguenza, Datastream scrive due file in Cloud Storage: un file di dati JSON e un file di schema Avro. Il file di schema ha lo stesso nome del file di dati, con estensione
.schema
.Abilita la compressione gzip per fare in modo che Datastream comprima i file scritti in Cloud Storage.
Utilizzando il parametro backfillNone
, la richiesta specifica che solo le modifiche in corso vengono trasmesse alla destinazione, senza backfill.
La richiesta specifica il parametro della chiave di crittografia gestita dal cliente, che consente di controllare le chiavi utilizzate per criptare i dati at-rest all'interno di un progetto Google Cloud. Il parametro si riferisce alla CMEK utilizzata da Datastream per criptare i dati trasmessi in flusso dall'origine alla destinazione. Specifica inoltre il keyring per la tua CMEK.
Per ulteriori informazioni sui keyring, vedi Risorse di Cloud KMS. Per ulteriori informazioni sulla protezione dei dati mediante chiavi di crittografia, consulta Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Convalida la definizione di un flusso
Prima di creare un flusso, puoi convalidarne la definizione. In questo modo, puoi assicurarti che tutti i controlli di convalida vengano superati e che lo stream venga eseguito correttamente al momento della creazione.
Convalida dei controlli di uno stream:
- Se l'origine è configurata correttamente per consentire a Datastream di trasmettere dati dal flusso.
- Indica se lo stream può connettersi sia all'origine che alla destinazione.
- La configurazione end-to-end del flusso.
Per convalidare uno stream, aggiungi &validate_only=true
all'URL che precede il corpo della richiesta:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Dopo aver effettuato questa richiesta, vedrai i controlli di convalida eseguiti da Datastream per l'origine e la destinazione, insieme all'esito dei controlli. In caso di mancato superamento di un controllo di convalida, vengono visualizzate informazioni sul motivo della mancata riuscita e su cosa fare per correggere il problema.
Ad esempio, supponiamo che tu abbia una chiave di crittografia gestita dal cliente (CMEK) che vuoi che Datastream utilizzi per criptare i flussi di dati trasmessi dall'origine alla destinazione. Nell'ambito della convalida del flusso, Datastream verificherà che la chiave esista e che Datastream disponga delle autorizzazioni per utilizzare la chiave. Se una di queste condizioni non viene soddisfatta, quando convalidi il flusso, verrà restituito il seguente messaggio di errore:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Per risolvere il problema, verifica che la chiave fornita esista e che l'account di servizio Datastream disponga dell'autorizzazione cloudkms.cryptoKeys.get
per la chiave.
Dopo aver apportato le correzioni del caso, invia nuovamente la richiesta per garantire il superamento di tutti i controlli di convalida. Nell'esempio precedente, il controllo CMEK_VALIDATE_PERMISSIONS
non restituirà più un messaggio di errore, ma avrà lo stato PASSED
.
Recuperare informazioni su uno stream
Il codice seguente mostra una richiesta di recupero delle informazioni su un flusso. Queste informazioni comprendono:
- Il nome dello stream (identificatore univoco)
- Un nome semplice per lo stream (nome visualizzato)
- Timestamp della creazione e dell'ultimo aggiornamento del flusso
- Informazioni sui profili di connessione di origine e di destinazione associati al flusso
- Lo stato dello stream
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
Viene visualizzata la risposta, come segue:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per recuperare informazioni sul tuo stream, fai clic qui.
Elenco flussi
Il codice seguente mostra una richiesta per recuperare un elenco di tutti i flussi in progetto e località specificati.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per recuperare informazioni su tutti i tuoi stream, fai clic qui.
Elenco degli oggetti di un flusso
Il codice seguente mostra una richiesta di recupero delle informazioni su tutti gli oggetti di un flusso.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per recuperare informazioni su tutti gli oggetti dello stream, fai clic qui.
L'elenco di oggetti che viene restituito potrebbe essere simile al seguente:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per elencare gli oggetti di uno stream, fai clic qui.
Avvio di un flusso
Il seguente codice mostra una richiesta di avvio di uno stream.
Se utilizzi il parametro updateMask
nella richiesta, solo i campi specificati devono essere inclusi nel corpo della richiesta. Per avviare uno stream, modifica il valore nel campo state
da CREATED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per avviare lo stream, fai clic qui.
Pausa di un flusso
Il codice seguente mostra una richiesta di mettere in pausa un flusso in esecuzione.
Per questo esempio, il campo specificato per il parametro updateMask
è state
. Se metti in pausa lo stream, cambi il suo stato da RUNNING
a PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per mettere in pausa lo stream, fai clic qui.
Ripresa di un flusso
Il codice seguente mostra una richiesta per riprendere uno stream in pausa.
Per questo esempio, il campo specificato per il parametro updateMask
è state
. Se riprendi lo stream, ne cambi lo stato da PAUSED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per riprendere lo streaming, fai clic qui.
Recuperare uno stream
Puoi recuperare un flusso di dati non riusciti definitivamente utilizzando il metodo RunStream
. Ogni
tipo di database di origine ha la propria definizione delle operazioni di recupero dello stream
possibili. Per ulteriori informazioni, consulta Recuperare uno stream.
Recupera un flusso per un'origine MySQL o Oracle
I seguenti esempi di codice mostrano le richieste di recupero di un flusso per un database MySQL o Origine Oracle da varie posizioni dei file di log:
REST
Recupera uno stream dalla posizione corrente. Questa è l'opzione predefinita:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Recupera uno stream dalla successiva posizione disponibile:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Recupera uno stream dalla posizione più recente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Recuperare uno stream da una posizione specifica (MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Sostituisci quanto segue:
- NAME_OF_THE_LOG_FILE: il nome del file di log da cui vuoi per recuperare il flusso
- POSITION: la posizione nel file di log da cui vuoi eseguire recuperare il flusso. Se non fornisci il valore, Datastream recupera il flusso dall'intestazione del file.
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Recupera un flusso da una posizione specifica (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Per ulteriori informazioni sulle opzioni di recupero disponibili, vedi Recuperare uno stream.
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Recuperare uno stream per un'origine PostgreSQL
Il seguente esempio di codice mostra una richiesta di recupero di uno stream per un'origine PostgreSQL. Durante il recupero, lo stream inizia a leggere dal primo numero di sequenza di log (LSN) nello slot di replica configurato per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Se vuoi cambiare lo slot di replica, aggiorna il flusso con il nuovo nome dello slot di replica:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Recupera un flusso per un'origine SQL Server
I seguenti esempi di codice mostrano richieste di esempio per recuperare un flusso per un Origine server.
REST
Recupera uno stream dalla prima posizione disponibile:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Recupera uno stream da un numero di sequenza di log preferito:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Avviare o riprendere uno stream da una posizione specifica
Puoi avviare o riprendere uno stream in pausa da una posizione specifica per Origini MySQL e Oracle. Questa operazione potrebbe essere utile quando vuoi eseguire il backfill utilizzando uno strumento esterno o avviare CDC da una posizione da te indicata. Per un Origine MySQL, devi indicare una posizione binlog; per un'origine Oracle, un System Change Number (SCN) nel file di log Ripetizione.
Il codice seguente mostra una richiesta per avviare o riprendere un flusso già creato da una posizione specifica.
Avvia o riprendi un flusso da una posizione binlog specifica (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Sostituisci quanto segue:
- NAME_OF_THE_LOG_FILE: il nome del file di log da cui vuoi per avviare lo stream.
- POSITION: la posizione nel file di log da cui vuoi iniziare. nel tuo stream. Se non fornisci il valore, Datastream inizia a leggere dall'intestazione del file.
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud
non è supportato. Per informazioni sull'utilizzo di gcloud
per avviare o riprendere uno stream, consulta la documentazione di Cloud SDK.
Avvia o riprendi un flusso da un numero di modifica di sistema specifico nel file di log di ripetizione (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
Non è possibile avviare o riprendere uno stream da una posizione specifica utilizzando gcloud
. Per informazioni sull'utilizzo di gcloud
per avviare uno stream, consulta la documentazione di Cloud SDK.
Modifica di un flusso
Il codice seguente mostra una richiesta di aggiornamento della configurazione della rotazione di file di un flusso per ruotare il file ogni 75 MB o 45 secondi.
In questo esempio, i campi specificati per il parametro updateMask
includono i campi fileRotationMb
e fileRotationInterval
, rappresentati rispettivamente dai flag destinationConfig.gcsDestinationConfig.fileRotationMb
e destinationConfig.gcsDestinationConfig.fileRotationInterval
.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Il codice seguente mostra una richiesta per includere un file di schema tipi unificati nel percorso dei file che Datastream scrive in Cloud Storage. Di conseguenza, Datastream scrive due file: un file di dati JSON e un file di schema Avro.
Per questo esempio, il campo specificato è il campo jsonFileFormat
, rappresentato dal flag destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Il codice seguente mostra una richiesta a Datastream di replicare i dati esistenti, oltre alle modifiche in corso ai dati, dal database di origine alla destinazione.
La sezione oracleExcludedObjects
del codice mostra le tabelle e gli schemi di cui è vietato il backfill nella destinazione.
In questo esempio verrà eseguito il backfill di tutte le tabelle e di tutti gli schemi, ad eccezione della tabellaA in schema3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per modificare lo stream, fai clic qui.
Avvia backfill per un oggetto di un flusso
Un flusso in Datastream può eseguire il backfill dei dati storici e trasmettere le modifiche in corso in una destinazione. Le modifiche in corso verranno sempre trasmesse in flusso da un'origine a una destinazione. Tuttavia, puoi specificare se desideri che i dati storici vengano trasmessi in flusso.
Se vuoi che i dati storici vengano trasmessi in flusso dall'origine alla destinazione, utilizza il parametro backfillAll
.
Datastream consente inoltre di trasmettere dati storici solo per tabelle di database specifiche. Per farlo, utilizza il parametro backfillAll
ed escludi le tabelle per le quali non vuoi dati storici.
Se vuoi che nella destinazione vengano trasmesse in flusso solo le modifiche in corso, utilizza il parametro backfillNone
. Se vuoi che Datastream invii uno snapshot di tutti i dati esistenti dall'origine alla destinazione, devi avviare manualmente il backfill per gli oggetti che contengono questi dati.
Un altro motivo per avviare il backfill di un oggetto è se i dati non sono sincronizzati tra l'origine e la destinazione. Ad esempio, un utente può eliminare inavvertitamente i dati nella destinazione, che ora andranno persi. In questo caso, l'avvio del backfill per l'oggetto funge da "meccanismo di ripristino" perché tutti i dati vengono inviati in flusso nella destinazione con un'unica inquadratura. Di conseguenza, i dati vengono sincronizzati tra l'origine e la destinazione.
Prima di poter avviare il backfill per un oggetto di un flusso, devi recuperare le informazioni sull'oggetto.
Ogni oggetto ha un OBJECT_ID, che lo identifica in modo univoco. Utilizza OBJECT_ID per avviare il backfill per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per avviare il backfill per un oggetto del tuo flusso, consulta la documentazione di Google Cloud SDK.
Arresta il backfill per un oggetto di un flusso
Dopo aver avviato il backfill per un oggetto di un flusso di dati, puoi arrestarlo per l'oggetto. Ad esempio, se un utente modifica lo schema di un database, lo schema o i dati potrebbero essere danneggiati. Non vuoi che questo schema o questi dati vengano trasmessi in flusso nella destinazione, quindi interrompi il backfill per l'oggetto.
Puoi anche arrestare il backfill per un oggetto ai fini del bilanciamento del carico. Datastream può eseguire più backfill in parallelo. Questo potrebbe aumentare il carico sull'origine. Se il carico è significativo, interrompi il backfill per ogni oggetto e poi avvialo per gli oggetti, uno alla volta.
Prima di poter arrestare il backfill per un oggetto di un flusso, devi richiedere una richiesta per recuperare le informazioni su tutti gli oggetti di un flusso. Ogni oggetto restituito ha un valore OBJECT_ID, che lo identifica in modo univoco. Utilizza OBJECT_ID per interrompere il backfill per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per interrompere il backfill di un oggetto dello stream, fai clic qui.
Modifica il numero massimo di attività CDC simultanee
Il seguente codice mostra come impostare il numero massimo di attività CDC (Change Data Capture) massime per un flusso MySQL su 7.
Per questo esempio, il campo specificato per il parametro updateMask
è maxConcurrentCdcTasks
. Se imposti il valore su 7, cambi il numero massimo di attività CDC simultanee dal valore precedente a 7. Puoi utilizzare valori compresi tra 0 e 50 (inclusi). Se non definisci il valore o se lo definisci come 0, per il flusso viene impostata l'impostazione predefinita di sistema di 5 attività.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
, fai clic qui.
Modificare il numero massimo di attività di backfill simultanee
Il codice seguente mostra come impostare il numero massimo di attività di backfill simultanee per un flusso MySQL su 25.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo maxConcurrentBackfillTasks
. Se imposti il valore su 25, modifichi il numero massimo di attività di backfill simultanee dal valore precedente a 25. Puoi utilizzare valori compresi tra 0 e 50 (incluso). Se non definisci il valore o se lo definisci come 0, per il flusso viene impostata l'impostazione predefinita di sistema di 16 attività.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
, fai clic qui.
Abilita flusso di oggetti di grandi dimensioni per le origini Oracle
Puoi abilitare i flussi di dati di oggetti di grandi dimensioni, come oggetti binari di grandi dimensioni (BLOB
),
oggetti di grandi dimensioni con caratteri (CLOB
) e oggetti di grandi dimensioni con caratteri nazionali (NCLOB
)
per i flussi con origini Oracle. Il flag streamLargeObjects
ti consente di includere
sia in flussi nuovi che esistenti. Il flag è impostato a livello di stream,
non devi specificare le colonne
dei tipi di dati di oggetti di grandi dimensioni.
L'esempio seguente mostra come creare uno stream che ti consenta di trasmettere in stream di grandi dimensioni. di oggetti strutturati.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Per saperne di più sull'utilizzo di gcloud
per aggiornare uno stream, consulta la documentazione di Google Cloud SDK.
Eliminazione di un flusso
Il codice seguente mostra una richiesta di eliminazione di uno stream.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per eliminare lo stream, fai clic qui.
Passaggi successivi
- Scopri come utilizzare l'API Datastream per gestire i profili di connessione.
- Scopri come utilizzare l'API Datastream per gestire le configurazioni di connettività privata.
- Per ulteriori informazioni sull'utilizzo dell'API Datastream, consulta la documentazione di riferimento.