In questa pagina imparerai a utilizzare l'API Datastream per:
- Creare stream
- Ricevere informazioni su flussi e oggetti di flusso
- Aggiorna i flussi di dati avviandoli, mettendoli in pausa, riprendendoli e modificandoli, nonché avviando e interrompendo il backfill per gli oggetti del flusso di dati
- Recuperare gli stream con errori permanenti
- Attiva lo streaming di oggetti di grandi dimensioni per i flussi Oracle
- Eliminare stream
Esistono due modi per utilizzare l'API Datastream. Puoi effettuare chiamate API REST o utilizzare Google Cloud CLI (CLI).
Per informazioni di alto livello sull'utilizzo di Google Cloud CLI
per gestire gli stream Datastream, consulta Stream Datastream di gcloud CLI.
Crea uno stream
In questa sezione imparerai a creare uno stream utilizzato per trasferire i dati dall'origine a una destinazione. Gli esempi che seguono non sono esaustivi, ma mettono in evidenza funzionalità specifiche di Datastream. Per risolvere il tuo caso d'uso specifico, utilizza questi esempi insieme alla documentazione di riferimento dell'API Datastream.
Questa sezione tratta i seguenti casi d'uso:
- Streaming da Oracle a Cloud Storage
- Flusso di dati da MySQL a BigQuery
- Flusso di dati da PostgreSQL a BigQuery
- Definisci un insieme di oggetti da includere nello stream
- Eseguire il backfill di tutti gli oggetti inclusi nel flusso
- Escludere oggetti dallo stream
- Escludere oggetti dal backfill
- Definisci CMEK per la crittografia dei dati at-rest
- Definire la modalità di scrittura per un flusso
- Trasmettere in streaming a un altro progetto in BigQuery
- Trasmetti dati in streaming alle tabelle gestite BigLake
Esempio 1: trasmettere flussi di oggetti specifici a BigQuery
In questo esempio imparerai a:
- Flusso di dati da MySQL a BigQuery
- Includere un insieme di oggetti nel flusso
- Definisci la modalità di scrittura per lo stream come di sola aggiunta
- Esegui il backfill di tutti gli oggetti inclusi nel flusso
Di seguito è riportata una richiesta per estrarre tutte le tabelle da schema1
e due tabelle specifiche da schema2
: tableA
e tableC
. Gli eventi vengono scritti in un set di dati
in BigQuery.
La richiesta non include il parametro customerManagedEncryptionKey
, pertanto
il sistema di gestione delle chiavi interno Google Cloud viene utilizzato per criptare i dati
anziché CMEK.
Il parametro backfillAll
associato all'esecuzione del backfill (o dello snapshot) storico è impostato su un dizionario vuoto ({}
), il che significa che Datastream esegue il backfill dei dati storici di tutte le tabelle incluse nello stream.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 2: escludere oggetti specifici da un flusso con un'origine PostgreSQL
In questo esempio imparerai a:
- Flusso di dati da PostgreSQL a BigQuery
- Escludere oggetti dallo stream
- Escludere oggetti dal backfill
Il seguente codice mostra una richiesta per creare uno stream utilizzato per trasferire i dati da un database PostgreSQL di origine a BigQuery. Quando crei uno stream da un database PostgreSQL di origine, devi specificare due campi aggiuntivi specifici di PostgreSQL nella richiesta:
replicationSlot
: uno slot di replica è un prerequisito per la configurazione di un database PostgreSQL per la replica. Devi creare uno slot di replica per ogni stream.publication
: una pubblicazione è un gruppo di tabelle da cui vuoi replicare le modifiche. Il nome della pubblicazione deve esistere nel database prima di avviare uno stream. Come minimo, la pubblicazione deve includere le tabelle specificate nell'elencoincludeObjects
del flusso.
Il parametro backfillAll
associato all'esecuzione del backfill storico (o dello snapshot) è impostato per escludere una tabella.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 3: specifica la modalità di scrittura di sola aggiunta per un flusso
Quando esegui lo streaming in BigQuery, puoi definire la modalità di scrittura: merge
o
appendOnly
. Per saperne di più, consulta Configurare la modalità di scrittura.
Se non specifichi la modalità di scrittura nella richiesta di creazione di uno stream, viene utilizzata la modalità
merge
predefinita.
La seguente richiesta mostra come definire la modalità appendOnly
quando crei
un flusso da MySQL a BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 4: trasmetti in streaming a un progetto diverso in BigQuery
Se hai creato le risorse Datastream in un progetto, ma vuoi trasmettere in streaming a un altro progetto in BigQuery, puoi farlo utilizzando una richiesta simile a quella riportata di seguito.
Se specifichi sourceHierarchyDatasets
per il set di dati di destinazione, devi compilare il campo projectId
.
Se specifichi singleTargetDataset
per il set di dati di destinazione, compila il campo datasetId
nel formato projectId:datasetId
.
REST
Per sourceHierarchyDatasets
:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream1 { "displayName": "My cross-project stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" }, "projectId": "myProjectId2" } } }, "backfillAll": {} }
Per singleTargetDataset
:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream2 { "displayName": "My cross-project stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "singleTargetDataset": { "datasetId": "myProjectId2:myDatasetId" }, } }, "backfillAll": {} }
gcloud
Per sourceHierarchyDatasets
:
datastream streams create crossProjectBqStream1 --location=us-central1 --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json --destination=destination-cp --bigquery-destination-config=source_hierarchy_cross_project_config.json --backfill-none
I contenuti del file di configurazione source_hierarchy_cross_project_config.json
:
{"sourceHierarchyDatasets": {"datasetTemplate": {"location": "us-central1", "datasetIdPrefix": "prefix_"}, "projectId": "myProjectId2"}}
Per singleTargetDataset
:
datastream streams create crossProjectBqStream --location=us-central1 --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json --destination=destination-cp --bigquery-destination-config=single_target_cross_project_config.json --backfill-none
I contenuti del file di configurazione single_target_cross_project_config.json
:
{"singleTargetDataset": {"datasetId": "myProjectId2:myDatastetId"}}
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 5: trasmettere in streaming a una destinazione Cloud Storage
In questo esempio imparerai a:
- Streaming da Oracle a Cloud Storage
- Definisci un insieme di oggetti da includere nello stream
- Definisci CMEK per criptare i dati at-rest
La seguente richiesta mostra come creare un flusso che scrive gli eventi in un bucket in Cloud Storage.
In questa richiesta di esempio, gli eventi vengono scritti nel formato di output JSON e viene creato un nuovo file ogni 100 MB o 30 secondi (sovrascrivendo i valori predefiniti di 50 MB e 60 secondi).
Per il formato JSON, puoi:
Includi un file di schema tipi unificati nel percorso. Di conseguenza, Datastream scrive due file in Cloud Storage: un file di dati JSON e un file di schema Avro. Il file di schema ha lo stesso nome del file di dati, con estensione
.schema
.Attiva la compressione gzip per consentire a Datastream di comprimere i file scritti in Cloud Storage.
Utilizzando il parametro backfillNone
, la richiesta specifica che solo le modifiche in corso vengono trasmesse in streaming alla destinazione, senza backfill.
La richiesta specifica il parametro della chiave di crittografia gestita dal cliente, che ti consente di controllare le chiavi utilizzate per criptare i dati at-rest all'interno di un progetto Google Cloud . Il parametro si riferisce alla chiave CMEK che Datastream utilizza per criptare i dati trasmessi in streaming dall'origine alla destinazione. Specifica anche il keyring per la chiave CMEK.
Per ulteriori informazioni sui keyring, consulta Risorse Cloud KMS. Per saperne di più sulla protezione dei dati mediante chiavi di crittografia, consulta Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Per saperne di più sull'utilizzo di gcloud
per creare uno stream, consulta la documentazione di Google Cloud SDK.
Esempio 6: trasmetti in streaming a una tabella gestita BigLake
In questo esempio, imparerai a configurare uno stream per replicare i dati da un database MySQL a una tabella BigLake Iceberg in modalità append-only
.
Prima di creare la richiesta, assicurati di aver completato i seguenti
passaggi:
- Avere un bucket Cloud Storage in cui vuoi archiviare i dati
- Crea una connessione risorsa Cloud
- Concedi l'accesso alla connessione di risorsa Cloud al bucket Cloud Storage
Puoi quindi utilizzare la seguente richiesta per creare lo stream:
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream { "displayName": "MySQL to BigLake stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "my-mysql-database" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" , "bigqueryDestinationConfig": { "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": { "datasetId": "my-project-id:my-bigquery-dataset-id" }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
datastream streams create mysqlBigLakeStream --location=us-central1 --display-name=mysql-to-bl-stream --source=source--mysql-source-config=mysql_source_config.json --destination=destination --bigquery-destination-config=bl_config.json --backfill-none
I contenuti del file di configurazione dell'origine mysql_source_config.json
:
{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{ "database":"my-mysql-database"}]}}
I contenuti del file di configurazione bl_config.json
:
{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder","connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }
Terraform
resource "google_datastream_stream" "stream" { stream_id = "mysqlBlStream" location = "us-central1" display_name = "MySQL to BigLake stream" source_config { source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp" mysql_source_config { include_objects { mysql_databases { database = "my-mysql-database" } } } } destination_config { destination_connection_profile ="projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id" bigquery_destination_config { single_target_dataset { dataset_id = "my-project-id:my-bigquery-dataset-id" } blmt_config { bucket = "my-gcs-bucket-name" table_format = "ICEBERG" file_format = "PARQUET" connection_name = "my-project-id.us-central1.my-bigquery-connection-name" root_path = "my/folder" } append_only {} } } backfill_none {} }
Convalidare la definizione di uno stream
Prima di creare uno stream, puoi convalidarne la definizione. In questo modo, puoi assicurarti che tutti i controlli di convalida vengano superati e che lo stream venga eseguito correttamente una volta creato.
La convalida di uno stream controlla:
- Se l'origine è configurata correttamente per consentire a Datastream di trasmettere in streaming i dati.
- Se lo stream può connettersi sia all'origine che alla destinazione.
- La configurazione end-to-end dello stream.
Per convalidare un flusso, aggiungi &validate_only=true
all'URL prima del corpo della richiesta:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Dopo aver effettuato questa richiesta, vedrai i controlli di convalida eseguiti da Datastream per l'origine e la destinazione, nonché se i controlli vengono superati o meno. Per ogni controllo di convalida non superato, vengono visualizzate informazioni sul motivo dell'errore e su cosa fare per risolvere il problema.
Ad esempio, supponiamo che tu abbia una chiave di crittografia gestita dal cliente (CMEK) che vuoi che Datastream utilizzi per criptare i dati trasmessi in streaming dall'origine alla destinazione. Nell'ambito della convalida dello stream, Datastream verificherà che la chiave esista e che Datastream disponga delle autorizzazioni per utilizzarla. Se una di queste condizioni non viene soddisfatta, quando convalidi lo stream, viene restituito il seguente messaggio di errore:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Per risolvere il problema, verifica che la chiave che hai fornito esista e che l'account di servizio Datastream disponga dell'autorizzazione cloudkms.cryptoKeys.get
per la chiave.
Dopo aver apportato le correzioni appropriate, invia di nuovo la richiesta per assicurarti che tutti i controlli di convalida vengano superati. Per l'esempio precedente, il controllo CMEK_VALIDATE_PERMISSIONS
non restituirà più un messaggio di errore, ma avrà lo stato PASSED
.
Ottenere informazioni su uno stream
Il seguente codice mostra una richiesta per recuperare informazioni su uno stream. Queste informazioni comprendono:
- Il nome dello stream (identificatore univoco)
- Un nome semplice per lo stream (nome visualizzato)
- Timestamp della creazione e dell'ultimo aggiornamento dello stream
- Informazioni sui profili di connessione di origine e destinazione associati allo stream
- Lo stato dello stream
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
La risposta viene visualizzata nel seguente modo:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per recuperare informazioni sul tuo stream, consulta la documentazione di Google Cloud SDK.
Elenco stream
Il seguente codice mostra una richiesta per recuperare un elenco di tutti gli stream nel progetto e nella località specificati.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Per saperne di più sull'utilizzo di gcloud
per recuperare informazioni su tutti i tuoi stream, consulta la documentazione di Google Cloud SDK.
Elenca gli oggetti di uno stream
Il seguente codice mostra una richiesta per recuperare informazioni su tutti gli oggetti di un flusso.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Per saperne di più sull'utilizzo di gcloud
per recuperare informazioni su tutti gli oggetti del flusso, consulta la documentazione di Google Cloud SDK.
L'elenco degli oggetti restituiti potrebbe essere simile al seguente:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Per saperne di più sull'utilizzo di gcloud
per elencare gli oggetti di un flusso, consulta la documentazione di Google Cloud SDK.
Avvio di un flusso
Il seguente codice mostra una richiesta per avviare uno stream.
Se utilizzi il parametro updateMask
nella richiesta, nel corpo della richiesta devono essere inclusi solo i campi che specifichi. Per avviare un flusso, modifica il valore nel campo state
da CREATED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Per saperne di più sull'utilizzo di gcloud
per avviare lo stream, consulta la documentazione di Google Cloud SDK.
Pausa di un flusso
Il seguente codice mostra una richiesta per mettere in pausa un flusso in esecuzione.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo state
. Se metti in pausa lo stream, il suo stato cambia da RUNNING
a PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Per saperne di più sull'utilizzo di gcloud
per mettere in pausa lo stream, consulta la documentazione di Google Cloud SDK.
Ripresa di un flusso
Il seguente codice mostra una richiesta per riprendere uno stream in pausa.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo state
. Se riprendi lo stream, il suo stato cambia da PAUSED
a RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Per saperne di più sull'utilizzo di gcloud
per riprendere lo stream, consulta la documentazione di Google Cloud SDK.
Recuperare uno stream
Puoi recuperare un flusso di dati non riuscito in modo permanente utilizzando il metodo RunStream
. Ogni tipo di database di origine ha una propria definizione delle operazioni di recupero dello stream possibili. Per saperne di più, consulta Recuperare uno stream.
Recuperare uno stream per un'origine MySQL o Oracle
I seguenti esempi di codice mostrano le richieste per recuperare un flusso per un'origine MySQL o Oracle da varie posizioni del file di log:
REST
Recupera un flusso dalla posizione corrente. Questa è l'opzione predefinita:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Recuperare un flusso dalla successiva posizione disponibile:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Recuperare uno stream dalla posizione più recente:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Recupera un flusso da una posizione specifica (replica basata su binlog MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Sostituisci quanto segue:
- NAME_OF_THE_LOG_FILE: il nome del file di log da cui vuoi recuperare lo stream
- POSITION: La posizione nel file di log da cui vuoi recuperare lo stream. Se non fornisci il valore, Datastream recupera lo stream dall'inizio del file.
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Recupera un flusso da una posizione specifica (replica basata su GTID di MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
Sostituisci GTID_SET con uno o più GTID singoli o intervalli di GTID da cui vuoi recuperare lo stream.
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3" } } } }
Recupera un flusso da una posizione specifica (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Per saperne di più sulle opzioni di recupero disponibili, vedi Recuperare uno stream.
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Recuperare uno stream per un'origine PostgreSQL
Il seguente esempio di codice mostra una richiesta di recupero di uno stream per un'origine PostgreSQL. Durante il recupero, lo stream inizia a leggere dal primo numero di sequenza di log (LSN) nello slot di replica configurato per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Se vuoi modificare lo slot di replica, aggiorna prima lo stream con il nuovo nome dello slot di replica:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Recuperare uno stream per un'origine SQL Server
I seguenti esempi di codice mostrano richieste di esempio per recuperare un flusso per un'origine SQL Server.
REST
Recuperare uno stream dalla prima posizione disponibile:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Recupera un flusso da un numero di sequenza di log preferito:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
Il recupero di uno stream utilizzando gcloud
non è supportato.
Avviare o riprendere uno stream da una posizione specifica
Puoi avviare un flusso o riprenderne uno in pausa da una posizione specifica per le origini MySQL e Oracle. Questo può essere utile quando vuoi eseguire il backfill utilizzando uno strumento esterno o avviare CDC da una posizione che indichi. Per un'origine MySQL, devi indicare una posizione binlog o un insieme GTID, mentre per un'origine Oracle, un numero della modifica di sistema (SCN) nel file di log di ripristino.
Il seguente codice mostra una richiesta per avviare o riprendere uno stream già creato da una posizione specifica.
Avvia o riprendi uno stream da una posizione binlog specifica (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Sostituisci quanto segue:
- NAME_OF_THE_LOG_FILE: il nome del file di log da cui vuoi avviare lo stream.
- POSITION: la posizione nel file di log da cui vuoi iniziare lo stream. Se non fornisci il valore, Datastream inizia a leggere dall'inizio del file.
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud
non è supportato. Per informazioni sull'utilizzo di gcloud
per avviare o riprendere uno stream, consulta la documentazione di Cloud SDK.
Avvia o riprendi uno stream da un set GTID specifico (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "GTID_SET" } } } }
Sostituisci GTID_SET con uno o più ID transazione globali singoli o intervalli di ID transazione globali da cui vuoi iniziare o riprendere lo stream.
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlGtidPosition": { "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7" } } } }
gcloud
L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud
non è supportato. Per informazioni sull'utilizzo di gcloud
per avviare o riprendere uno stream, consulta la documentazione di Cloud SDK.
Avvia o riprendi un flusso da un numero della modifica di sistema specifico nel file di log di ripristino (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Ad esempio:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud
non è supportato. Per informazioni sull'utilizzo di gcloud
per avviare uno stream, consulta la documentazione di Cloud SDK.
Modifica di un flusso
Il seguente codice mostra una richiesta di aggiornamento della configurazione di rotazione dei file di un flusso per ruotare il file ogni 75 MB o 45 secondi.
Per questo esempio, i campi specificati per il parametro updateMask
includono i campi fileRotationMb
e fileRotationInterval
, rappresentati rispettivamente dai flag destinationConfig.gcsDestinationConfig.fileRotationMb
e destinationConfig.gcsDestinationConfig.fileRotationInterval
.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Il seguente codice mostra una richiesta per includere un file di schema tipi unificati nel percorso dei file che Datastream scrive in Cloud Storage. Di conseguenza, Datastream scrive due file: un file di dati JSON e un file di schema Avro.
Per questo esempio, il campo specificato è jsonFileFormat
, rappresentato dal flag destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Il seguente codice mostra una richiesta a Datastream di replicare i dati esistenti, oltre alle modifiche in corso ai dati, dal database di origine alla destinazione.
La sezione oracleExcludedObjects
del codice mostra le tabelle e gli schemi per cui è limitato il riempimento retroattivo nella destinazione.
Per questo esempio, verranno eseguito il backfill di tutte le tabelle e di tutti gli schemi, ad eccezione di tableA in schema3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per modificare il flusso, consulta la documentazione di Google Cloud SDK.
Avviare il backfill per un oggetto di un flusso
Un flusso in Datastream può eseguire il backfill dei dati storici, nonché trasmettere in streaming le modifiche in corso a una destinazione. Le modifiche in corso verranno sempre trasmesse in streaming da un'origine a una destinazione. Tuttavia, puoi specificare se vuoi che i dati storici vengano trasmessi in streaming.
Se vuoi che i dati storici vengano trasmessi in streaming dall'origine alla destinazione, utilizza il parametro backfillAll
.
Datastream ti consente anche di trasmettere in streaming i dati storici solo per tabelle di database specifiche. A questo scopo, utilizza il parametro backfillAll
ed escludi le tabelle per le quali non vuoi dati storici.
Se vuoi che nella destinazione vengano trasmessi in streaming solo i cambiamenti in corso, utilizza il parametro backfillNone
. Se poi vuoi che Datastream trasmetta in streaming uno snapshot di tutti i dati esistenti dall'origine alla destinazione, devi avviare manualmente il backfill per gli oggetti che contengono questi dati.
Un altro motivo per avviare il backfill per un oggetto è se i dati non sono sincronizzati tra l'origine e la destinazione. Ad esempio, un utente può eliminare inavvertitamente i dati nella destinazione e ora i dati sono persi. In questo caso, l'avvio del backfill per l'oggetto funge da "meccanismo di ripristino" perché tutti i dati vengono trasmessi in streaming nella destinazione in un'unica operazione. Di conseguenza, i dati vengono sincronizzati tra l'origine e la destinazione.
Prima di poter avviare il backfill per un oggetto di uno stream, devi recuperare le informazioni sull'oggetto.
Ogni oggetto ha un OBJECT_ID, che lo identifica in modo univoco. Utilizzi OBJECT_ID per avviare il backfill per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Per saperne di più sull'utilizzo di gcloud
per avviare il backfill per un oggetto del flusso, consulta la documentazione di Google Cloud SDK.
Arrestare il backfill per un oggetto di un flusso
Dopo aver avviato il backfill per un oggetto di un flusso, puoi arrestarlo per l'oggetto. Ad esempio, se un utente modifica uno schema di database, lo schema o i dati potrebbero essere danneggiati. Non vuoi che questo schema o questi dati vengano trasmessi in streaming alla destinazione, quindi interrompi il backfill per l'oggetto.
Puoi anche interrompere il backfill per un oggetto ai fini del bilanciamento del carico. Datastream può eseguire più backfill in parallelo. Questa operazione può aumentare il carico sull'origine. Se il carico è significativo, interrompi il backfill per ogni oggetto, quindi avvialo per gli oggetti uno alla volta.
Prima di poter interrompere il backfill per un oggetto di uno stream, devi inviare una richiesta per recuperare informazioni su tutti gli oggetti di uno stream. Ogni oggetto restituito ha un OBJECT_ID, che lo identifica in modo univoco. Utilizza OBJECT_ID per interrompere il backfill per lo stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Per saperne di più sull'utilizzo di gcloud
per interrompere il backfill per un oggetto del flusso, consulta la documentazione di Google Cloud SDK.
Modificare il numero massimo di attività CDC simultanee
Il seguente codice mostra come impostare il numero massimo di attività CDC (Change Data Capture) simultanee per un flusso MySQL su 7.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo maxConcurrentCdcTasks
. Se imposti il valore su 7, modifichi il numero massimo di attività CDC simultanee dal valore precedente a 7. Puoi utilizzare valori compresi tra 0 e 50 (inclusi). Se non definisci il valore o se lo definisci come 0, per lo stream viene impostato il valore predefinito del sistema di 5 attività.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Per saperne di più sull'utilizzo di gcloud
, consulta la documentazione di Google Cloud SDK.
Modificare il numero massimo di attività di backfill simultanee
Il seguente codice mostra come impostare il numero massimo di attività di backfill simultanee per un flusso MySQL su 25.
Per questo esempio, il campo specificato per il parametro updateMask
è il campo maxConcurrentBackfillTasks
. Se imposti il valore su 25, modifichi il numero massimo di attività di backfill simultanee dal valore precedente a 25. Puoi utilizzare valori compresi tra 0 e 50 (inclusi). Se non definisci il valore o se lo definisci come 0, per lo stream viene impostato il valore predefinito del sistema pari a 16 attività.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Per saperne di più sull'utilizzo di gcloud
, consulta la documentazione di Google Cloud SDK.
Abilita lo streaming di oggetti di grandi dimensioni per le origini Oracle
Puoi attivare lo streaming di oggetti di grandi dimensioni, come oggetti binari di grandi dimensioni (BLOB
),
oggetti di grandi dimensioni di tipo carattere (CLOB
) e oggetti di grandi dimensioni di tipo carattere nazionale (NCLOB
)
per i flussi con origini Oracle. Il flag streamLargeObjects
consente di includere
oggetti di grandi dimensioni sia nei flussi nuovi che in quelli esistenti. Il flag è impostato a livello di stream,
non devi specificare le colonne dei tipi di dati di oggetti di grandi dimensioni.
L'esempio seguente mostra come creare un flusso che consente di trasmettere in streaming oggetti di grandi dimensioni.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Per saperne di più sull'utilizzo di gcloud
per aggiornare uno stream, consulta la documentazione di Google Cloud SDK.
Eliminazione di un flusso
Il seguente codice mostra una richiesta di eliminazione di uno stream.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Per ulteriori informazioni sull'utilizzo di gcloud
per eliminare lo stream, consulta la documentazione di Google Cloud SDK.
Passaggi successivi
- Scopri come utilizzare l'API Datastream per gestire i profili di connessione.
- Scopri come utilizzare l'API Datastream per gestire le configurazioni di connettività privata.
- Per saperne di più sull'utilizzo dell'API Datastream, consulta la documentazione di riferimento.