Gestione dei flussi

In questa pagina scoprirai come utilizzare l'API Datastream per:

  • Creare stream
  • Visualizzare informazioni su stream e oggetti stream
  • Aggiornare gli stream avviandoli, mettendoli in pausa, riprendendoli e modificandoli, nonché avviando e interrompendo il backfill per gli oggetti stream
  • Recuperare gli stream non riusciti definitivamente
  • Attivare lo streaming di oggetti di grandi dimensioni per gli stream Oracle
  • Eliminare gli stream

Puoi utilizzare l'API Datastream in due modi. Puoi effettuare chiamate all'API REST o utilizzare Google Cloud CLI (interfaccia a riga di comando).

Per informazioni di alto livello sull'utilizzo di gcloud per gestire gli stream di Datastream, consulta Stream di gcloud Datastream.

Creazione di un flusso

In questa sezione imparerai a creare uno stream utilizzato per trasferire i dati dall'origine a una destinazione. Gli esempi riportati di seguito non sono esaustivi, ma mettono in evidenza funzionalità specifiche di Datastream. Per risolvere il tuo caso d'uso specifico, utilizza questi esempi insieme alla documentazione di riferimento dell'API Datastream.

Questa sezione illustra i seguenti casi d'uso:

Esempio 1: trasmetti oggetti specifici in streaming a BigQuery

In questo esempio imparerai a:

  • Esegui il flusso da MySQL a BigQuery
  • Includi un insieme di oggetti nello stream
  • Definisci la modalità di scrittura per lo stream come di sola aggiunta
  • Esegui il backfill di tutti gli oggetti inclusi nello stream

Di seguito è riportata una richiesta di estrazione di tutte le tabelle da schema1 e di due tabelle specifiche da schema2: tableA e tableC. Gli eventi vengono scritti in un set di dati in BigQuery.

La richiesta non include il parametro customerManagedEncryptionKey, pertanto per criptare i dati viene utilizzato il sistema di gestione delle chiavi interno di Google Cloud anziché CMEK.

Il parametro backfillAll associato all'esecuzione del backfill (o dello snapshot) storico è impostato su un dizionario vuoto ({}), il che significa che Datastream esegue il backfill dei dati storici di tutte le tabelle incluse nello stream.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 2: escludi oggetti specifici da uno stream con un'origine PostgreSQL

In questo esempio imparerai a:

  • Esegui il flusso da PostgreSQL a BigQuery
  • Escludere oggetti dallo stream
  • Escludere oggetti dal backfill

Il codice seguente mostra una richiesta di creazione di uno stream utilizzato per trasferire i dati da un database PostgreSQL di origine a BigQuery. Quando crei uno stream da un database PostgreSQL di origine, devi specificare nella richiesta altri due campi specifici di PostgreSQL:

  • replicationSlot: uno slot di replica è un prerequisito per la configurazione di un database PostgreSQL per la replica. Devi creare uno slot di replica per ogni stream.
  • publication: una pubblicazione è un gruppo di tabelle di cui vuoi replicare le modifiche. Il nome della pubblicazione deve esistere nel database prima di avviare uno stream. La pubblicazione deve includere almeno le tabelle specificate nell'elenco includeObjects dello stream.

Il parametro backfillAll associato all'esecuzione del backfill storico (o dello snapshot) è impostato per escludere una tabella.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 3: specifica la modalità di scrittura di sola aggiunta per uno stream

Quando esegui lo streaming in BigQuery, puoi definire la modalità di scrittura: merge o appendOnly. Per ulteriori informazioni, consulta Configurare la modalità di scrittura.

Se non specifichi la modalità di scrittura nella richiesta per creare uno stream, viene utilizzata la modalità predefinitamerge.

La richiesta seguente mostra come definire la modalità appendOnly quando crei un flusso da MySQL a BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 4: streaming in una destinazione Cloud Storage

In questo esempio imparerai a:

  • Stream da Oracle a Cloud Storage
  • Definisci un insieme di oggetti da includere nello stream
  • Definire la chiave CMEK per la crittografia dei dati at-rest

La richiesta seguente mostra come creare uno stream che scrive gli eventi in un bucket in Cloud Storage.

In questa richiesta di esempio, gli eventi vengono scritti nel formato di output JSON e viene creato un nuovo file ogni 100 MB o 30 secondi (sostituendo i valori predefiniti di 50 MB e 60 secondi).

Per il formato JSON, puoi:

  • Includi un file di schema di tipi unificati nel percorso. Di conseguenza, Datastream scrive due file in Cloud Storage: un file di dati JSON e un file di schema Avro. Il file dello schema ha lo stesso nome del file di dati, con estensione .schema.

  • Attiva la compressione gzip per consentire a Datastream di comprimere i file scritti in Cloud Storage.

Se utilizzi il parametro backfillNone, la richiesta specifica che solo le modifiche in corso vengono importate nella destinazione in streaming, senza backfill.

La richiesta specifica il parametro della chiave di crittografia gestita dal cliente che ti consente di controllare le chiavi utilizzate per criptare i dati inattivi all'interno di un progetto Google Cloud. Il parametro si riferisce al CMEK utilizzato da Datastream per criptare i dati trasmessi in streaming dall'origine alla destinazione. Specifica anche il keyring per la chiave CMEK.

Per ulteriori informazioni sui keyring, consulta le risorse Cloud KMS. Per ulteriori informazioni sulla protezione dei dati mediante chiavi di crittografia, consulta Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Convalida la definizione di uno stream

Prima di creare uno stream, puoi convalidarne la definizione. In questo modo, puoi assicurarti che tutti i controlli di convalida vengano superati e che lo stream venga eseguito correttamente al momento della creazione.

La convalida di uno stream verifica:

  • Indica se l'origine è configurata correttamente per consentire a Datastream di trasmettere i dati.
  • Indica se lo stream può connettersi sia all'origine che alla destinazione.
  • La configurazione end-to-end dello stream.

Per convalidare uno stream, aggiungi &validate_only=true all'URL che precede il corpo della richiesta:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Dopo aver effettuato questa richiesta, vedrai i controlli di convalida eseguiti da Datastream per l'origine e la destinazione, nonché se i controlli sono stati superati o meno. Per qualsiasi controllo di convalida non superato, vengono visualizzate informazioni sul motivo dell'errore e su cosa fare per correggere il problema.

Ad esempio, supponiamo che tu abbia una chiave di crittografia gestita dal cliente (CMEK) che vuoi che Datastream utilizzi per criptare i dati in streaming dall'origine alla destinazione. Nell'ambito della convalida dello stream, Datastream verificherà che la chiave esista e che Datastream disponga delle autorizzazioni per utilizzarla. Se una di queste condizioni non viene soddisfatta, quando convalidi lo stream viene restituito il seguente messaggio di errore:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Per risolvere il problema, verifica che la chiave che hai fornito esista e che l'account di servizio Datastream disponga dell'autorizzazione cloudkms.cryptoKeys.get per la chiave.

Dopo aver apportato le correzioni del caso, invia di nuovo la richiesta per assicurarti che tutti i controlli di convalida vengano superati. Nell'esempio precedente, il controllo CMEK_VALIDATE_PERMISSIONS non restituirà più un messaggio di errore, ma avrà uno stato PASSED.

Ricevere informazioni su uno stream

Il seguente codice mostra una richiesta per recuperare informazioni su uno stream. Queste informazioni comprendono:

  • Il nome dello stream (identificatore univoco)
  • Un nome intuitivo per lo stream (nome visualizzato)
  • Timestamp della creazione e dell'ultimo aggiornamento dello stream
  • Informazioni sui profili di connessione di origine e di destinazione associati allo stream
  • Lo stato dello stream

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

Viene visualizzata la risposta, come segue:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per recuperare informazioni sul tuo stream, fai clic qui.

Elenco stream

Il codice seguente mostra una richiesta per recuperare un elenco di tutti gli stream nel progetto e nella località specificati.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per recuperare informazioni su tutti i tuoi stream, fai clic qui.

Elenca gli oggetti di uno stream

Il codice seguente mostra una richiesta per recuperare informazioni su tutti gli oggetti di uno stream.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per recuperare informazioni su tutti gli oggetti dello stream, fai clic qui.

L'elenco di oggetti restituiti potrebbe essere simile al seguente:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per elencare gli oggetti di uno stream, fai clic qui.

Avvio di un flusso

Il seguente codice mostra una richiesta di avvio di uno stream.

Se utilizzi il parametro updateMask nella richiesta, nel corpo della richiesta devono essere inclusi solo i campi specificati. Per avviare uno stream, modifica il valore nel campo state da CREATED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per avviare lo stream, fai clic qui.

Pausa di un flusso

Il seguente codice mostra una richiesta di messa in pausa di uno stream in esecuzione.

Per questo esempio, il campo specificato per il parametro updateMask è il campo state. Mettendo in pausa lo stream, ne cambi lo stato da RUNNING a PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per mettere in pausa lo stream, fai clic qui.

Ripresa di un flusso

Il seguente codice mostra una richiesta di ripresa di uno stream in pausa.

Per questo esempio, il campo specificato per il parametro updateMask è il campo state. Se riprendi lo stream, ne cambi lo stato da PAUSED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per riprendere lo stream, fai clic qui.

Recuperare uno stream

Puoi recuperare uno stream con errore permanente utilizzando il metodo RunStream. Ogni tipo di database di origine ha la propria definizione delle operazioni di recupero dello stream possibili. Per ulteriori informazioni, vedi Ripristinare uno stream.

Recuperare uno stream per un'origine MySQL o Oracle

I seguenti esempi di codice mostrano le richieste per recuperare uno stream per un'origine MySQL o Oracle da varie posizioni del file di log:

REST

Recupera uno stream dalla posizione corrente. Questa è l'opzione predefinita:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Recuperare uno stream dalla posizione disponibile successiva:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Recuperare uno stream dalla posizione più recente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Recuperare uno stream da una posizione specifica (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Sostituisci quanto segue:

  • NAME_OF_THE_LOG_FILE: il nome del file log da cui vuoi recuperare lo stream
  • POSITION: la posizione nel file log da cui vuoi recuperare lo stream. Se non fornisci il valore, Datastream recupera lo stream dall'inizio del file.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Recuperare uno stream da una posizione specifica (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Sostituisci scn con il numero della modifica di sistema (SCN) nel file del log di ripristino da cui vuoi recuperare lo stream. Questo campo è obbligatorio.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Per ulteriori informazioni sulle opzioni di recupero disponibili, consulta Recuperare uno stream.

gcloud

Il recupero di uno stream utilizzando gcloud non è supportato.

Recuperare uno stream per un'origine PostgreSQL

Il seguente esempio di codice mostra una richiesta di recupero di uno stream per un'origine PostgreSQL. Durante il recupero, lo stream inizia a leggere dal primo numero di sequenza di log (LSN) nello slot di replica configurato per lo stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Se vuoi modificare lo slot di replica, aggiorna prima lo stream con il nuovo nome dello slot di replica:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Il recupero di uno stream utilizzando gcloud non è supportato.

Recuperare uno stream per un'origine SQL Server

I seguenti esempi di codice mostrano richieste di esempio per recuperare uno stream per un'origine SQL Server.

REST

Ripristina uno stream dalla prima posizione disponibile:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Recupera uno stream da un numero di sequenza di log preferito:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

Il recupero di uno stream utilizzando gcloud non è supportato.

Avviare o riprendere uno stream da una posizione specifica

Puoi avviare uno stream o riprendere uno stream in pausa da una posizione specifica per le origini MySQL e Oracle. Questa operazione può essere utile quando vuoi eseguire il backfill utilizzando uno strumento esterno o avviare la CDC da una posizione da te indicata. Per un'origine MySQL, devi indicare una posizione del log binario, per un'origine Oracle, un numero della modifica di sistema (SCN) nel file del log di ripristino.

Il codice seguente mostra una richiesta di avvio o ripresa di uno stream già creato da una posizione specifica.

Avvia o riprendi uno stream da una posizione del file binlog specifica (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Sostituisci quanto segue:

  • NAME_OF_THE_LOG_FILE: il nome del file log da cui vuoi avviare lo stream.
  • POSITION: la posizione nel file di log da cui vuoi avviare lo stream. Se non fornisci il valore, Datastream inizia a leggere dall'inizio del file.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud non è supportato. Per informazioni sull'utilizzo di gcloud per avviare o riprendere uno stream, consulta la documentazione di Cloud SDK.

Avvia o riprendi uno stream da un numero della modifica di sistema specifico nel file di log di ripetizione (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Sostituisci scn con il numero della modifica di sistema (SCN) nel file del log di ripristino da cui vuoi avviare lo stream. Questo campo è obbligatorio.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud non è supportato. Per informazioni sull'utilizzo di gcloud per avviare uno stream, consulta la documentazione di Cloud SDK.

Modifica di un flusso

Il codice seguente mostra una richiesta di aggiornamento della configurazione della rotazione dei file di uno stream per ruotare il file ogni 75 MB o 45 secondi.

Per questo esempio, i campi specificati per il parametro updateMask includono i campi fileRotationMb e fileRotationInterval, rappresentati rispettivamente dai flag destinationConfig.gcsDestinationConfig.fileRotationMb e destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Il codice seguente mostra una richiesta di inclusione di un file di schema di tipi unificati nel percorso dei file che Datastream scrive in Cloud Storage. Di conseguenza, Datastream scrive due file: un file di dati JSON e un file di schema Avro.

Per questo esempio, il campo specificato è il campo jsonFileFormat, rappresentato dal flag destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

Il codice seguente mostra una richiesta a Datastream di replicare i dati esistenti, oltre alle modifiche in corso ai dati, dal database di origine a quello di destinazione.

La sezione oracleExcludedObjects del codice mostra le tabelle e gli schemi di cui è vietato il backfill nella destinazione.

Per questo esempio, verrà eseguito il backfill di tutte le tabelle e di tutti gli schemi, ad eccezione della tabella A nello schema 3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per modificare lo stream, fai clic qui.

Avvia il backfill per un oggetto di uno stream

Uno stream in Datastream può eseguire il backfill dei dati storici, nonché trasmettere le modifiche in corso in una destinazione. Le modifiche in corso verranno sempre trasmesse in streaming da un'origine a una destinazione. Tuttavia, puoi specificare se vuoi che i dati storici vengano trasmessi in streaming.

Se vuoi che i dati storici vengano trasmessi in streaming dall'origine alla destinazione, utilizza il parametro backfillAll.

Datastream ti consente anche di eseguire lo streaming dei dati storici solo per tabelle di database specifiche. A questo scopo, utilizza il parametro backfillAll ed escludi le tabelle per le quali non vuoi i dati storici.

Se vuoi che solo le modifiche in corso vengano trasmesse in streaming alla destinazione, utilizza il parametro backfillNone. Se vuoi che Datastream trasmetta un flusso di uno snapshot di tutti i dati esistenti dall'origine alla destinazione, devi avviare manualmente il backfill per gli oggetti che contengono questi dati.

Un altro motivo per avviare il backfill di un oggetto è se i dati non sono sincronizzati tra l'origine e la destinazione. Ad esempio, un utente può eliminare inavvertitamente i dati nella destinazione, che vengono quindi persi. In questo caso, l'avvio del backfill per l'oggetto funge da "meccanismo di reimpostazione" perché tutti i dati vengono trasmessi in streaming alla destinazione in un solo passaggio. Di conseguenza, i dati vengono sincronizzati tra l'origine e la destinazione.

Prima di poter avviare il backfill per un oggetto di uno stream, devi recuperare le informazioni sull'oggetto.

Ogni oggetto ha un OBJECT_ID, che lo identifica in modo univoco. Utilizza OBJECT_ID per avviare il backfill per lo stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per avviare il backfill di un oggetto dello stream, consulta la documentazione di Google Cloud SDK.

Interrompere il backfill per un oggetto di uno stream

Dopo aver avviato il backfill per un oggetto di uno stream, puoi interromperlo. Ad esempio, se un utente modifica lo schema di un database, lo schema o i dati potrebbero essere danneggiati. Non vuoi che questo schema o questi dati vengano trasmessi in streaming alla destinazione, quindi interrompi il backfill per l'oggetto.

Puoi anche interrompere il backfill di un oggetto per il bilanciamento del carico. Datastream può eseguire più backfill in parallelo. Questa operazione potrebbe applicare un carico supplementare sull'origine. Se il carico è elevato, interrompi il backfill per ogni oggetto e poi avvialo per gli oggetti uno alla volta.

Prima di poter interrompere il backfill per un oggetto di uno stream, devi inviare una richiesta per recuperare le informazioni su tutti gli oggetti di uno stream. Ogni oggetto restituito ha un OBJECT_ID, che lo identifica in modo univoco. Utilizza OBJECT_ID per interrompere il backfill per lo stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per interrompere il backfill di un oggetto dello stream, fai clic qui.

Modificare il numero massimo di attività CDC simultanee

Il seguente codice mostra come impostare il numero massimo di attività CDC (Change Data Capture) concorrenti per uno stream MySQL su 7.

Per questo esempio, il campo specificato per il parametro updateMask è il campo maxConcurrentCdcTasks. Impostando il valore su 7, modifichi il numero di attività CDC contemporanee massime dal valore precedente a 7. Puoi utilizzare valori compresi tra 0 e 50 (incluso). Se non definisci il valore o lo definisci come 0, per lo stream viene impostato il valore predefinito del sistema di 5 attività.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud, fai clic qui.

Modificare il numero massimo di attività di backfill simultanee

Il seguente codice mostra come impostare il numero massimo di attività di backfill contemporanee per uno stream MySQL su 25.

Per questo esempio, il campo specificato per il parametro updateMask è il campo maxConcurrentBackfillTasks. Impostando il valore su 25, modifichi il numero di attività di backfill contemporanee massime dal valore precedente a 25. Puoi utilizzare valori compresi tra 0 e 50 (incluso). Se non definisci il valore o lo definisci come 0, per lo stream viene impostato il valore predefinito del sistema di 16 attività.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud, fai clic qui.

Attivare lo streaming di oggetti di grandi dimensioni per le origini Oracle

Puoi attivare lo streaming di oggetti di grandi dimensioni, come oggetti di grandi dimensioni binari (BLOB), oggetti di grandi dimensioni con caratteri (CLOB) e oggetti di grandi dimensioni con caratteri nazionali (NCLOB) per gli stream con origini Oracle. Il flag streamLargeObjects ti consente di includere oggetti di grandi dimensioni sia negli stream nuovi che esistenti. Il flag viene impostato a livello di stream, non è necessario specificare le colonne dei tipi di dati di oggetti di grandi dimensioni.

L'esempio seguente mostra come creare uno stream che ti consenta di trasmettere oggetti di grandi dimensioni.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per aggiornare uno stream, consulta la documentazione di Google Cloud SDK.

Eliminazione di un flusso

Il seguente codice mostra una richiesta di eliminazione di uno stream.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per eliminare lo stream, fai clic qui.

Passaggi successivi