Gestione dei flussi

Panoramica

In questa sezione imparerai a utilizzare l'API Datastream per:

  • Crea stream
  • Recupero informazioni su flussi e oggetti flusso
  • Aggiorna i flussi avviando, mettendo in pausa, ripristinandoli e modificandoli, nonché avviando e arrestando il backfill per gli oggetti dei flussi
  • Recupera i flussi di dati non riusciti definitivamente
  • Abilita flusso di oggetti di grandi dimensioni per i flussi Oracle
  • Elimina flussi

Puoi utilizzare l'API Datastream in due modi. Puoi effettuare chiamate API REST oppure utilizzare Google Cloud CLI (CLI).

Per informazioni generali sull'utilizzo di gcloud per gestire i flussi di Datastream, consulta gcloud Datastream Stream.

Crea uno stream

In questa sezione imparerai a creare un flusso da utilizzare per trasferire i dati dall'origine a una destinazione. Gli esempi riportati di seguito non sono esaustivi, ma evidenziano funzionalità specifiche di Datastream. Per affrontare il tuo caso d'uso specifico, utilizza questi esempi insieme alla documentazione di riferimento dell'API di Datastream.

Questa sezione riguarda i seguenti casi d'uso:

Esempio 1: creare un flusso di oggetti specifici in BigQuery

In questo esempio imparerai a:

  • Trasmetti il flusso da MySQL a BigQuery
  • Includi un insieme di oggetti nel flusso
  • Esegui il backfill di tutti gli oggetti inclusi nel flusso

Di seguito è riportata una richiesta per eseguire il pull di tutte le tabelle da schema1 e due tabelle specifiche da schema2: tableA e tableC. Gli eventi vengono scritti in un set di dati in BigQuery.

La richiesta non include il parametro customerManagedEncryptionKey, pertanto per criptare i tuoi dati viene utilizzato il sistema di gestione delle chiavi interno di Google Cloud anziché CMEK.

Il parametro backfillAll associato all'esecuzione del backfill (o snapshot) storico è impostato su un dizionario vuoto ({}), il che significa che Datastream esegue il backfill dei dati storici di tutte le tabelle incluse nel flusso.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {}
}

gcloud

Per saperne di più sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 2: escludere oggetti specifici da un flusso con un'origine PostgreSQL

In questo esempio imparerai a:

  • Crea flussi da PostgreSQL a BigQuery
  • Escludi oggetti dal flusso
  • Escludi oggetti dal backfill

Il codice seguente mostra una richiesta per creare un flusso da utilizzare per trasferire i dati da un database PostgreSQL di origine a BigQuery. Quando crei un flusso da un database PostgreSQL di origine, devi specificare nella richiesta due campi aggiuntivi specifici per PostgreSQL:

  • replicationSlot: uno slot di replica è un prerequisito per la configurazione di un database PostgreSQL per la replica. Devi creare uno slot di replica per ogni flusso.
  • publication: una pubblicazione è un gruppo di tabelle da cui vuoi replicare le modifiche. Il nome della pubblicazione deve esistere nel database prima di avviare un flusso. La pubblicazione deve includere almeno le tabelle specificate nell'elenco includeObjects del flusso.

Il parametro backfillAll associato all'esecuzione del backfill (o snapshot) storico è impostato per escludere una tabella.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Per saperne di più sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 3: trasmettere in streaming a una destinazione Cloud Storage

In questo esempio imparerai a:

  • Trasmetti in streaming da Oracle a Cloud Storage
  • Definisci un insieme di oggetti da includere nel flusso
  • Definire una CMEK per la crittografia dei dati at-rest

La richiesta seguente mostra come creare un flusso che scrive gli eventi in un bucket in Cloud Storage.

In questa richiesta di esempio, gli eventi vengono scritti nel formato di output JSON e viene creato un nuovo file ogni 100 MB o 30 secondi (sovrascrivendo i valori predefiniti di 50 MB e 60 secondi).

Per il formato JSON, puoi:

  • Includi un file di schema dei tipi unificato nel percorso. Di conseguenza, Datastream scrive due file in Cloud Storage: un file di dati JSON e un file di schema Avro. Il file di schema ha lo stesso nome del file di dati, con un'estensione .schema.

  • Abilita la compressione gzip per fare in modo che Datastream comprima i file scritti in Cloud Storage.

Utilizzando il parametro backfillNone, la richiesta specifica che nella destinazione vengono trasmessi solo i flussi di modifiche in corso, senza backfill.

La richiesta specifica il parametro della chiave di crittografia gestita dal cliente che ti consente di controllare le chiavi utilizzate per criptare i dati at-rest all'interno di un progetto Google Cloud. Il parametro fa riferimento alla CMEK utilizzata da Datastream per criptare i dati trasmessi in streaming dall'origine alla destinazione. Specifica inoltre il keyring per la tua CMEK.

Per ulteriori informazioni sui keyring, consulta Risorse Cloud KMS. Per ulteriori informazioni sulla protezione dei dati tramite chiavi di crittografia, vedi Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Per saperne di più sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Convalidare la definizione di flusso

Prima di creare uno stream, puoi convalidarne la definizione. In questo modo, puoi assicurarti che tutti i controlli di convalida siano stati superati e che il flusso venga eseguito correttamente al momento della creazione.

Convalida dei controlli dello stream:

  • Indica se l'origine è configurata correttamente per consentire a Datastream di trasmettere dati dall'origine.
  • Indica se il flusso può connettersi sia all'origine sia alla destinazione.
  • La configurazione end-to-end del flusso.

Per convalidare uno stream, aggiungi &validate_only=true all'URL che precede il corpo della richiesta:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Dopo aver effettuato questa richiesta, vedrai i controlli di convalida che Datastream esegue per l'origine e la destinazione e indica se sono stati superati o meno. Per qualsiasi controllo di convalida che non viene superato, vengono visualizzate informazioni sul motivo dell'esito negativo e sulla procedura da seguire per risolvere il problema.

Ad esempio, supponi di avere una chiave di crittografia gestita dal cliente (CMEK) che vuoi che Datastream utilizzi per criptare i dati trasmessi in streaming dall'origine alla destinazione. Nell'ambito della convalida del flusso, Datastream verificherà che la chiave esista e che Datastream disponga delle autorizzazioni per utilizzare la chiave. Se una di queste condizioni non è soddisfatta, quando convalidi il flusso verrà visualizzato il seguente messaggio di errore:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Per risolvere il problema, verifica che la chiave che hai fornito esista e che l'account di servizio Datastream disponga dell'autorizzazione cloudkms.cryptoKeys.get per la chiave.

Dopo aver apportato le correzioni necessarie, ripeti la richiesta per assicurarti che tutti i controlli di convalida siano stati superati. Nell'esempio precedente, il controllo CMEK_VALIDATE_PERMISSIONS non restituirà più un messaggio di errore, ma avrà lo stato PASSED.

Ottenere informazioni su uno stream

Il seguente codice mostra una richiesta per recuperare informazioni su uno stream. Queste informazioni comprendono:

  • Il nome del flusso (identificatore univoco)
  • Un nome facile da usare per lo stream (nome visualizzato)
  • Timestamp della creazione e dell'ultimo aggiornamento dello stream
  • Informazioni sui profili di connessione di origine e di destinazione associati al flusso
  • Lo stato del flusso

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

La risposta viene visualizzata come segue:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Per maggiori informazioni sull'uso di gcloud per recuperare le informazioni sul tuo stream, fai clic qui.

Elenca flussi

Il codice seguente mostra una richiesta per recuperare un elenco di tutti i flussi di dati nel progetto e nella località specificati.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per recuperare le informazioni su tutti i tuoi stream, fai clic qui.

Elenco degli oggetti di un flusso

Il seguente codice mostra una richiesta per recuperare informazioni su tutti gli oggetti di un flusso.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Per maggiori informazioni sull'uso di gcloud per recuperare le informazioni su tutti gli oggetti del tuo stream, fai clic qui.

L'elenco degli oggetti che viene restituito potrebbe essere simile al seguente:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Per ulteriori informazioni sull'uso di gcloud per elencare gli oggetti di uno stream, fai clic qui.

Avvio di un flusso

Il seguente codice mostra una richiesta di avvio di uno stream.

Se utilizzi il parametro updateMask nella richiesta, solo i campi specificati devono essere inclusi nel corpo della richiesta. Per avviare uno stream, modifica il valore nel campo state da CREATED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per avviare il tuo stream, fai clic qui.

Avviare uno stream da una posizione specifica

Puoi avviare un flusso da una posizione specifica per le origini MySQL e Oracle, ad esempio se vuoi eseguire il backfill utilizzando uno strumento esterno oppure avviare CDC da un file di log da te indicato. Per un'origine MySQL, devi indicare una posizione binlog, per un'origine Oracle, un numero di modifica del sistema (SCN) nel file di log di ripetizione.

Il seguente codice mostra una richiesta di avvio di un flusso già creato da una posizione specifica.

Avvia un flusso da una posizione binlog specifica (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Sostituisci quanto segue:

  • NAME_OF_THE_LOG_FILE: nome del file di log da cui vuoi avviare il flusso.
  • POSITION: la posizione nel file di log da cui vuoi avviare il flusso. Se non fornisci il valore, Datastream inizia a leggere dall'intestazione del file.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

L'avvio di uno stream da una posizione specifica utilizzando gcloud non è supportato. Per informazioni sull'utilizzo di gcloud per avviare uno stream, consulta la documentazione di Cloud SDK.

Avvia un flusso da un numero di modifica di sistema specifico nel file di log delle ripetizioni (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Sostituisci scn con il numero di modifica di sistema (SCN) nel file di log di ripetizione da cui vuoi avviare lo stream. Questo campo è obbligatorio.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

L'avvio di uno stream da una posizione specifica utilizzando gcloud non è supportato. Per informazioni sull'utilizzo di gcloud per avviare uno stream, consulta la documentazione di Cloud SDK.

Pausa di un flusso

Il seguente codice mostra una richiesta di mettere in pausa uno stream in esecuzione.

Per questo esempio, il campo specificato per il parametro updateMask è il campo state. Se metti in pausa lo stream, ne modifichi lo stato da RUNNING a PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per mettere in pausa lo stream, fai clic qui.

Ripresa di un flusso

Il seguente codice mostra una richiesta di riprendere uno stream in pausa.

Per questo esempio, il campo specificato per il parametro updateMask è il campo state. Se riprendi lo streaming, ne modifichi lo stato da PAUSED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per riprendere lo stream, fai clic qui.

Recuperare un flusso

Puoi recuperare un flusso non riuscito definitivamente per un'origine MySQL, Oracle o PostgreSQL utilizzando il metodo RunStream. Ogni tipo di database di origine ha una propria definizione di operazioni possibili di recupero dei flussi. Per maggiori informazioni, consulta Recuperare uno stream.

Recupera un flusso per un'origine MySQL o Oracle

I seguenti esempi di codice mostrano le richieste di recupero di un flusso per un'origine MySQL o Oracle da varie posizioni dei file di log:

REST

Recupera un flusso dalla posizione corrente. Questa è l'opzione predefinita:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Recupera uno stream dalla successiva posizione disponibile:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Recupera uno stream dalla posizione più recente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Recupera un flusso da una posizione specifica (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Sostituisci quanto segue:

  • NAME_OF_THE_LOG_FILE: nome del file di log da cui vuoi recuperare il flusso
  • POSITION: la posizione nel file di log da cui vuoi recuperare il flusso. Se non fornisci il valore, Datastream recupera il flusso dall'intestazione del file.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Recupera un flusso da una posizione specifica (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Sostituisci scn con il numero di modifica di sistema (SCN) nel file di log di ripetizione da cui vuoi recuperare il flusso. Questo campo è obbligatorio.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Per ulteriori informazioni sulle opzioni di recupero disponibili, consulta Recuperare uno stream.

gcloud

Il recupero di uno stream utilizzando gcloud non è supportato.

Recupera un flusso per un'origine PostgreSQL

Il seguente esempio di codice mostra una richiesta di recupero di un flusso per un'origine PostgreSQL. Durante il ripristino, il flusso inizia a leggere dal primo numero di sequenza di log (LSN) nello slot di replica configurato per il flusso.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Se vuoi modificare lo slot di replica, aggiorna prima il flusso con il nuovo nome dello slot di replica:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Il recupero di uno stream utilizzando gcloud non è supportato.

Modifica di un flusso

Il codice seguente mostra una richiesta di aggiornamento della configurazione della rotazione file di un flusso per ruotare il file ogni 75 MB o 45 secondi.

Per questo esempio, i campi specificati per il parametro updateMask includono i campi fileRotationMb e fileRotationInterval, rappresentati rispettivamente dai flag destinationConfig.gcsDestinationConfig.fileRotationMb e destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Il codice seguente mostra una richiesta per includere un file di schema di tipi unificati nel percorso dei file che Datastream scrive in Cloud Storage. Di conseguenza, Datastream scrive due file: un file di dati JSON e un file di schema Avro.

In questo esempio, il campo specificato è il campo jsonFileFormat, rappresentato dal flag destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

Il codice seguente mostra una richiesta a Datastream di replicare i dati esistenti, oltre alle modifiche continue ai dati, dal database di origine alla destinazione.

La sezione oracleExcludedObjects del codice mostra le tabelle e gli schemi per i quali non è possibile eseguire il backfill nella destinazione.

Per questo esempio, verrà eseguito il backfill di tutte le tabelle e gli schemi, ad eccezione di tableA in schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}

gcloud

Per ulteriori informazioni sull'uso di gcloud per modificare il tuo stream, fai clic qui.

Avvia il backfill per un oggetto di un flusso

Un flusso in Datastream può eseguire il backfill di dati storici e il flusso delle modifiche in corso in una destinazione. Le modifiche in corso verranno sempre trasferite in streaming da un'origine a una destinazione. Tuttavia, puoi specificare se vuoi che i dati storici siano trasmessi.

Se vuoi che i dati storici siano trasmessi dall'origine alla destinazione, utilizza il parametro backfillAll.

Datastream consente inoltre di trasmettere dati storici solo per tabelle di database specifiche. A questo scopo, utilizza il parametro backfillAll ed escludi le tabelle per le quali non vuoi ricevere dati storici.

Se vuoi che nella destinazione vengano trasmesse solo le modifiche in corso, utilizza il parametro backfillNone. Se vuoi che Datastream trasmetti in streaming uno snapshot di tutti i dati esistenti dall'origine alla destinazione, devi avviare manualmente il backfill per gli oggetti che contengono questi dati.

Un altro motivo per avviare il backfill di un oggetto è se i dati non sono sincronizzati tra l'origine e la destinazione. Ad esempio, un utente può eliminare inavvertitamente alcuni dati nella destinazione, che ora vanno persi. In questo caso, l'avvio del backfill per l'oggetto funge da "meccanismo di reset" perché tutti i dati vengono trasmessi nella destinazione con un'unica ripresa. Di conseguenza, i dati vengono sincronizzati tra l'origine e la destinazione.

Prima di poter avviare il backfill per un oggetto di un flusso, devi recuperare le informazioni sull'oggetto.

Ogni oggetto ha un OBJECT_ID, che identifica in modo univoco l'oggetto. Usa OBJECT_ID per avviare il backfill per lo stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per il backfill iniziale di un oggetto del tuo stream, fai clic qui.

Arresta il backfill per un oggetto di un flusso

Dopo aver avviato il backfill per un oggetto di un flusso, puoi interromperlo per l'oggetto. Ad esempio, se un utente modifica uno schema di database, lo schema o i dati potrebbero essere danneggiati. Non vuoi che questo schema o questi dati vengano trasmessi nella destinazione, quindi interrompi il backfill per l'oggetto.

Puoi anche arrestare il backfill per un oggetto ai fini del bilanciamento del carico. Datastream può eseguire più backfill in parallelo. Questa operazione potrebbe aumentare il carico sull'origine. Se il carico è significativo, arresta il backfill per ogni oggetto, quindi avvia il backfill per gli oggetti, uno alla volta.

Prima di poter interrompere il backfill di un oggetto di un flusso, devi effettuare una richiesta per recuperare le informazioni su tutti gli oggetti di un flusso. Ogni oggetto restituito ha un OBJECT_ID, che identifica in modo univoco l'oggetto. Usi OBJECT_ID per interrompere il backfill per lo stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per interrompere il backfill di un oggetto del tuo stream, fai clic qui.

Modificare il numero massimo di attività CDC simultanee

Il seguente codice mostra come impostare su 7 il numero massimo di attività CDC (Change Data Capture) massime per uno stream MySQL.

Per questo esempio, il campo specificato per il parametro updateMask è il campo maxConcurrentCdcTasks. Se il valore è impostato su 7, modifichi il numero massimo di attività CDC simultanee dal valore precedente a 7. Puoi utilizzare valori da 0 a 50 (inclusi). Se non definisci il valore o se lo definisci come 0, per il flusso viene impostato il valore predefinito di sistema di 5 attività.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }
}

gcloud

Per ulteriori informazioni sull'uso di gcloud, fai clic qui.

Modifica il numero massimo di attività di backfill simultanee

Il seguente codice mostra come impostare il numero massimo di attività di backfill simultanee per un flusso MySQL su 25.

Per questo esempio, il campo specificato per il parametro updateMask è il campo maxConcurrentBackfillTasks. Se imposti il valore su 25, modifichi il numero massimo di attività di backfill simultanee dal valore precedente a 25. Puoi utilizzare valori da 0 a 50 (inclusi). Se non definisci il valore o se lo definisci come 0, per il flusso viene impostato il valore predefinito di sistema di 16 attività.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }
}

gcloud

Per ulteriori informazioni sull'uso di gcloud, fai clic qui.

Abilita flusso di oggetti di grandi dimensioni per le origini Oracle

Puoi abilitare il flusso di dati di oggetti di grandi dimensioni, ad esempio oggetti binari di grandi dimensioni (BLOB), oggetti di grandi dimensioni con caratteri (CLOB) e oggetti di grandi dimensioni con caratteri nazionali (NCLOB) per i flussi con origini Oracle. Il flag streamLargeObjects consente di includere oggetti di grandi dimensioni sia nei flussi nuovi che in quelli esistenti. Il flag viene impostato a livello di flusso, non è necessario specificare le colonne dei tipi di dati degli oggetti di grandi dimensioni.

L'esempio seguente mostra come creare un flusso che consenta di creare flussi di oggetti di grandi dimensioni.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Per saperne di più sull'utilizzo di gcloud per aggiornare uno stream, consulta la documentazione di Google Cloud SDK.

Eliminazione di un flusso

Il seguente codice mostra una richiesta di eliminazione di uno stream.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Per ulteriori informazioni sull'uso di gcloud per eliminare il tuo stream, fai clic qui.