Gestione dei flussi

In questa pagina imparerai a utilizzare l'API Datastream per:

  • Creare stream
  • Ricevere informazioni su flussi e oggetti di flusso
  • Aggiorna i flussi di dati avviandoli, mettendoli in pausa, riprendendoli e modificandoli, nonché avviando e interrompendo il backfill per gli oggetti del flusso di dati
  • Recuperare gli stream con errori permanenti
  • Attiva lo streaming di oggetti di grandi dimensioni per i flussi Oracle
  • Eliminare stream

Esistono due modi per utilizzare l'API Datastream. Puoi effettuare chiamate API REST o utilizzare Google Cloud CLI (CLI).

Per informazioni di alto livello sull'utilizzo di Google Cloud CLI per gestire gli stream Datastream, consulta Stream Datastream di gcloud CLI.

Crea uno stream

In questa sezione imparerai a creare uno stream utilizzato per trasferire i dati dall'origine a una destinazione. Gli esempi che seguono non sono esaustivi, ma mettono in evidenza funzionalità specifiche di Datastream. Per risolvere il tuo caso d'uso specifico, utilizza questi esempi insieme alla documentazione di riferimento dell'API Datastream.

Questa sezione tratta i seguenti casi d'uso:

Esempio 1: trasmettere flussi di oggetti specifici a BigQuery

In questo esempio imparerai a:

  • Flusso di dati da MySQL a BigQuery
  • Includere un insieme di oggetti nel flusso
  • Definisci la modalità di scrittura per lo stream come di sola aggiunta
  • Esegui il backfill di tutti gli oggetti inclusi nel flusso

Di seguito è riportata una richiesta per estrarre tutte le tabelle da schema1 e due tabelle specifiche da schema2: tableA e tableC. Gli eventi vengono scritti in un set di dati in BigQuery.

La richiesta non include il parametro customerManagedEncryptionKey, pertanto il sistema di gestione delle chiavi interno Google Cloud viene utilizzato per criptare i dati anziché CMEK.

Il parametro backfillAll associato all'esecuzione del backfill (o dello snapshot) storico è impostato su un dizionario vuoto ({}), il che significa che Datastream esegue il backfill dei dati storici di tutte le tabelle incluse nello stream.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Per saperne di più sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 2: escludere oggetti specifici da un flusso con un'origine PostgreSQL

In questo esempio imparerai a:

  • Flusso di dati da PostgreSQL a BigQuery
  • Escludere oggetti dallo stream
  • Escludere oggetti dal backfill

Il seguente codice mostra una richiesta per creare uno stream utilizzato per trasferire i dati da un database PostgreSQL di origine a BigQuery. Quando crei uno stream da un database PostgreSQL di origine, devi specificare due campi aggiuntivi specifici di PostgreSQL nella richiesta:

  • replicationSlot: uno slot di replica è un prerequisito per la configurazione di un database PostgreSQL per la replica. Devi creare uno slot di replica per ogni stream.
  • publication: una pubblicazione è un gruppo di tabelle da cui vuoi replicare le modifiche. Il nome della pubblicazione deve esistere nel database prima di avviare uno stream. Come minimo, la pubblicazione deve includere le tabelle specificate nell'elenco includeObjects del flusso.

Il parametro backfillAll associato all'esecuzione del backfill storico (o dello snapshot) è impostato per escludere una tabella.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Per saperne di più sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 3: specifica la modalità di scrittura di sola aggiunta per un flusso

Quando esegui lo streaming in BigQuery, puoi definire la modalità di scrittura: merge o appendOnly. Per saperne di più, consulta Configurare la modalità di scrittura.

Se non specifichi la modalità di scrittura nella richiesta di creazione di uno stream, viene utilizzata la modalità merge predefinita.

La seguente richiesta mostra come definire la modalità appendOnly quando crei un flusso da MySQL a BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Per saperne di più sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 4: trasmetti in streaming a un progetto diverso in BigQuery

Se hai creato le risorse Datastream in un progetto, ma vuoi trasmettere in streaming a un altro progetto in BigQuery, puoi farlo utilizzando una richiesta simile a quella riportata di seguito.

Se specifichi sourceHierarchyDatasets per il set di dati di destinazione, devi compilare il campo projectId.

Se specifichi singleTargetDataset per il set di dati di destinazione, compila il campo datasetId nel formato projectId:datasetId.

REST

Per sourceHierarchyDatasets:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream1
{
  "displayName": "My cross-project stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        },
        "projectId": "myProjectId2"
      }
    }
  },
  "backfillAll": {}
}

Per singleTargetDataset:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=crossProjectBqStream2
{
  "displayName": "My cross-project stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "singleTargetDataset": {
        "datasetId": "myProjectId2:myDatasetId"
      },
    }
  },
  "backfillAll": {}
}

gcloud

Per sourceHierarchyDatasets:

  datastream streams create crossProjectBqStream1 --location=us-central1
  --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json
  --destination=destination-cp --bigquery-destination-config=source_hierarchy_cross_project_config.json
  --backfill-none
  

I contenuti del file di configurazione source_hierarchy_cross_project_config.json:

  {"sourceHierarchyDatasets": {"datasetTemplate": {"location": "us-central1", "datasetIdPrefix": "prefix_"}, "projectId": "myProjectId2"}}
  

Per singleTargetDataset:

  datastream streams create crossProjectBqStream --location=us-central1
  --display-name=my-cross-project-stream --source=source-cp --mysql-source-config=mysql_source_config.json
  --destination=destination-cp --bigquery-destination-config=single_target_cross_project_config.json
  --backfill-none
  

I contenuti del file di configurazione single_target_cross_project_config.json:

  {"singleTargetDataset": {"datasetId": "myProjectId2:myDatastetId"}}
  

Per saperne di più sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 5: trasmettere in streaming a una destinazione Cloud Storage

In questo esempio imparerai a:

  • Streaming da Oracle a Cloud Storage
  • Definisci un insieme di oggetti da includere nello stream
  • Definisci CMEK per criptare i dati at-rest

La seguente richiesta mostra come creare un flusso che scrive gli eventi in un bucket in Cloud Storage.

In questa richiesta di esempio, gli eventi vengono scritti nel formato di output JSON e viene creato un nuovo file ogni 100 MB o 30 secondi (sovrascrivendo i valori predefiniti di 50 MB e 60 secondi).

Per il formato JSON, puoi:

  • Includi un file di schema tipi unificati nel percorso. Di conseguenza, Datastream scrive due file in Cloud Storage: un file di dati JSON e un file di schema Avro. Il file di schema ha lo stesso nome del file di dati, con estensione .schema.

  • Attiva la compressione gzip per consentire a Datastream di comprimere i file scritti in Cloud Storage.

Utilizzando il parametro backfillNone, la richiesta specifica che solo le modifiche in corso vengono trasmesse in streaming alla destinazione, senza backfill.

La richiesta specifica il parametro della chiave di crittografia gestita dal cliente, che ti consente di controllare le chiavi utilizzate per criptare i dati at-rest all'interno di un progetto Google Cloud . Il parametro si riferisce alla chiave CMEK che Datastream utilizza per criptare i dati trasmessi in streaming dall'origine alla destinazione. Specifica anche il keyring per la chiave CMEK.

Per ulteriori informazioni sui keyring, consulta Risorse Cloud KMS. Per saperne di più sulla protezione dei dati mediante chiavi di crittografia, consulta Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Per saperne di più sull'utilizzo di gcloud per creare uno stream, consulta la documentazione di Google Cloud SDK.

Esempio 6: trasmetti in streaming a una tabella gestita BigLake

In questo esempio, imparerai a configurare uno stream per replicare i dati da un database MySQL a una tabella BigLake Iceberg in modalità append-only. Prima di creare la richiesta, assicurati di aver completato i seguenti passaggi:

  • Avere un bucket Cloud Storage in cui vuoi archiviare i dati
  • Crea una connessione risorsa Cloud
  • Concedi l'accesso alla connessione di risorsa Cloud al bucket Cloud Storage

Puoi quindi utilizzare la seguente richiesta per creare lo stream:

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream
{
  "displayName": "MySQL to BigLake stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBigLakeStream --location=us-central1
--display-name=mysql-to-bl-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=bl_config.json
--backfill-none

I contenuti del file di configurazione dell'origine mysql_source_config.json:

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

I contenuti del file di configurazione bl_config.json:

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlStream"
  location     = "us-central1"
  display_name = "MySQL to BigLake stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

Convalidare la definizione di uno stream

Prima di creare uno stream, puoi convalidarne la definizione. In questo modo, puoi assicurarti che tutti i controlli di convalida vengano superati e che lo stream venga eseguito correttamente una volta creato.

La convalida di uno stream controlla:

  • Se l'origine è configurata correttamente per consentire a Datastream di trasmettere in streaming i dati.
  • Se lo stream può connettersi sia all'origine che alla destinazione.
  • La configurazione end-to-end dello stream.

Per convalidare un flusso, aggiungi &validate_only=true all'URL prima del corpo della richiesta:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Dopo aver effettuato questa richiesta, vedrai i controlli di convalida eseguiti da Datastream per l'origine e la destinazione, nonché se i controlli vengono superati o meno. Per ogni controllo di convalida non superato, vengono visualizzate informazioni sul motivo dell'errore e su cosa fare per risolvere il problema.

Ad esempio, supponiamo che tu abbia una chiave di crittografia gestita dal cliente (CMEK) che vuoi che Datastream utilizzi per criptare i dati trasmessi in streaming dall'origine alla destinazione. Nell'ambito della convalida dello stream, Datastream verificherà che la chiave esista e che Datastream disponga delle autorizzazioni per utilizzarla. Se una di queste condizioni non viene soddisfatta, quando convalidi lo stream, viene restituito il seguente messaggio di errore:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Per risolvere il problema, verifica che la chiave che hai fornito esista e che l'account di servizio Datastream disponga dell'autorizzazione cloudkms.cryptoKeys.get per la chiave.

Dopo aver apportato le correzioni appropriate, invia di nuovo la richiesta per assicurarti che tutti i controlli di convalida vengano superati. Per l'esempio precedente, il controllo CMEK_VALIDATE_PERMISSIONS non restituirà più un messaggio di errore, ma avrà lo stato PASSED.

Ottenere informazioni su uno stream

Il seguente codice mostra una richiesta per recuperare informazioni su uno stream. Queste informazioni comprendono:

  • Il nome dello stream (identificatore univoco)
  • Un nome semplice per lo stream (nome visualizzato)
  • Timestamp della creazione e dell'ultimo aggiornamento dello stream
  • Informazioni sui profili di connessione di origine e destinazione associati allo stream
  • Lo stato dello stream

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

La risposta viene visualizzata nel seguente modo:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per recuperare informazioni sul tuo stream, consulta la documentazione di Google Cloud SDK.

Elenco stream

Il seguente codice mostra una richiesta per recuperare un elenco di tutti gli stream nel progetto e nella località specificati.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Per saperne di più sull'utilizzo di gcloud per recuperare informazioni su tutti i tuoi stream, consulta la documentazione di Google Cloud SDK.

Elenca gli oggetti di uno stream

Il seguente codice mostra una richiesta per recuperare informazioni su tutti gli oggetti di un flusso.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Per saperne di più sull'utilizzo di gcloud per recuperare informazioni su tutti gli oggetti del flusso, consulta la documentazione di Google Cloud SDK.

L'elenco degli oggetti restituiti potrebbe essere simile al seguente:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Per saperne di più sull'utilizzo di gcloud per elencare gli oggetti di un flusso, consulta la documentazione di Google Cloud SDK.

Avvio di un flusso

Il seguente codice mostra una richiesta per avviare uno stream.

Se utilizzi il parametro updateMask nella richiesta, nel corpo della richiesta devono essere inclusi solo i campi che specifichi. Per avviare un flusso, modifica il valore nel campo state da CREATED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Per saperne di più sull'utilizzo di gcloud per avviare lo stream, consulta la documentazione di Google Cloud SDK.

Pausa di un flusso

Il seguente codice mostra una richiesta per mettere in pausa un flusso in esecuzione.

Per questo esempio, il campo specificato per il parametro updateMask è il campo state. Se metti in pausa lo stream, il suo stato cambia da RUNNING a PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Per saperne di più sull'utilizzo di gcloud per mettere in pausa lo stream, consulta la documentazione di Google Cloud SDK.

Ripresa di un flusso

Il seguente codice mostra una richiesta per riprendere uno stream in pausa.

Per questo esempio, il campo specificato per il parametro updateMask è il campo state. Se riprendi lo stream, il suo stato cambia da PAUSED a RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Per saperne di più sull'utilizzo di gcloud per riprendere lo stream, consulta la documentazione di Google Cloud SDK.

Recuperare uno stream

Puoi recuperare un flusso di dati non riuscito in modo permanente utilizzando il metodo RunStream. Ogni tipo di database di origine ha una propria definizione delle operazioni di recupero dello stream possibili. Per saperne di più, consulta Recuperare uno stream.

Recuperare uno stream per un'origine MySQL o Oracle

I seguenti esempi di codice mostrano le richieste per recuperare un flusso per un'origine MySQL o Oracle da varie posizioni del file di log:

REST

Recupera un flusso dalla posizione corrente. Questa è l'opzione predefinita:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Recuperare un flusso dalla successiva posizione disponibile:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Recuperare uno stream dalla posizione più recente:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Recupera un flusso da una posizione specifica (replica basata su binlog MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Sostituisci quanto segue:

  • NAME_OF_THE_LOG_FILE: il nome del file di log da cui vuoi recuperare lo stream
  • POSITION: La posizione nel file di log da cui vuoi recuperare lo stream. Se non fornisci il valore, Datastream recupera lo stream dall'inizio del file.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Recupera un flusso da una posizione specifica (replica basata su GTID di MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Sostituisci GTID_SET con uno o più GTID singoli o intervalli di GTID da cui vuoi recuperare lo stream.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

Recupera un flusso da una posizione specifica (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Sostituisci scn con il numero della modifica di sistema (SCN) nel file di log di ripristino da cui vuoi recuperare lo stream. Questo campo è obbligatorio.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Per saperne di più sulle opzioni di recupero disponibili, vedi Recuperare uno stream.

gcloud

Il recupero di uno stream utilizzando gcloud non è supportato.

Recuperare uno stream per un'origine PostgreSQL

Il seguente esempio di codice mostra una richiesta di recupero di uno stream per un'origine PostgreSQL. Durante il recupero, lo stream inizia a leggere dal primo numero di sequenza di log (LSN) nello slot di replica configurato per lo stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Se vuoi modificare lo slot di replica, aggiorna prima lo stream con il nuovo nome dello slot di replica:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Il recupero di uno stream utilizzando gcloud non è supportato.

Recuperare uno stream per un'origine SQL Server

I seguenti esempi di codice mostrano richieste di esempio per recuperare un flusso per un'origine SQL Server.

REST

Recuperare uno stream dalla prima posizione disponibile:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Recupera un flusso da un numero di sequenza di log preferito:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

Il recupero di uno stream utilizzando gcloud non è supportato.

Avviare o riprendere uno stream da una posizione specifica

Puoi avviare un flusso o riprenderne uno in pausa da una posizione specifica per le origini MySQL e Oracle. Questo può essere utile quando vuoi eseguire il backfill utilizzando uno strumento esterno o avviare CDC da una posizione che indichi. Per un'origine MySQL, devi indicare una posizione binlog o un insieme GTID, mentre per un'origine Oracle, un numero della modifica di sistema (SCN) nel file di log di ripristino.

Il seguente codice mostra una richiesta per avviare o riprendere uno stream già creato da una posizione specifica.

Avvia o riprendi uno stream da una posizione binlog specifica (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Sostituisci quanto segue:

  • NAME_OF_THE_LOG_FILE: il nome del file di log da cui vuoi avviare lo stream.
  • POSITION: la posizione nel file di log da cui vuoi iniziare lo stream. Se non fornisci il valore, Datastream inizia a leggere dall'inizio del file.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud non è supportato. Per informazioni sull'utilizzo di gcloud per avviare o riprendere uno stream, consulta la documentazione di Cloud SDK.

Avvia o riprendi uno stream da un set GTID specifico (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Sostituisci GTID_SET con uno o più ID transazione globali singoli o intervalli di ID transazione globali da cui vuoi iniziare o riprendere lo stream.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud non è supportato. Per informazioni sull'utilizzo di gcloud per avviare o riprendere uno stream, consulta la documentazione di Cloud SDK.

Avvia o riprendi un flusso da un numero della modifica di sistema specifico nel file di log di ripristino (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Sostituisci scn con il numero della modifica di sistema (SCN) nel file di log di ripristino da cui vuoi avviare lo stream. Questo campo è obbligatorio.

Ad esempio:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

L'avvio o la ripresa di uno stream da una posizione specifica utilizzando gcloud non è supportato. Per informazioni sull'utilizzo di gcloud per avviare uno stream, consulta la documentazione di Cloud SDK.

Modifica di un flusso

Il seguente codice mostra una richiesta di aggiornamento della configurazione di rotazione dei file di un flusso per ruotare il file ogni 75 MB o 45 secondi.

Per questo esempio, i campi specificati per il parametro updateMask includono i campi fileRotationMb e fileRotationInterval, rappresentati rispettivamente dai flag destinationConfig.gcsDestinationConfig.fileRotationMb e destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Il seguente codice mostra una richiesta per includere un file di schema tipi unificati nel percorso dei file che Datastream scrive in Cloud Storage. Di conseguenza, Datastream scrive due file: un file di dati JSON e un file di schema Avro.

Per questo esempio, il campo specificato è jsonFileFormat, rappresentato dal flag destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

Il seguente codice mostra una richiesta a Datastream di replicare i dati esistenti, oltre alle modifiche in corso ai dati, dal database di origine alla destinazione.

La sezione oracleExcludedObjects del codice mostra le tabelle e gli schemi per cui è limitato il riempimento retroattivo nella destinazione.

Per questo esempio, verranno eseguito il backfill di tutte le tabelle e di tutti gli schemi, ad eccezione di tableA in schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per modificare il flusso, consulta la documentazione di Google Cloud SDK.

Avviare il backfill per un oggetto di un flusso

Un flusso in Datastream può eseguire il backfill dei dati storici, nonché trasmettere in streaming le modifiche in corso a una destinazione. Le modifiche in corso verranno sempre trasmesse in streaming da un'origine a una destinazione. Tuttavia, puoi specificare se vuoi che i dati storici vengano trasmessi in streaming.

Se vuoi che i dati storici vengano trasmessi in streaming dall'origine alla destinazione, utilizza il parametro backfillAll.

Datastream ti consente anche di trasmettere in streaming i dati storici solo per tabelle di database specifiche. A questo scopo, utilizza il parametro backfillAll ed escludi le tabelle per le quali non vuoi dati storici.

Se vuoi che nella destinazione vengano trasmessi in streaming solo i cambiamenti in corso, utilizza il parametro backfillNone. Se poi vuoi che Datastream trasmetta in streaming uno snapshot di tutti i dati esistenti dall'origine alla destinazione, devi avviare manualmente il backfill per gli oggetti che contengono questi dati.

Un altro motivo per avviare il backfill per un oggetto è se i dati non sono sincronizzati tra l'origine e la destinazione. Ad esempio, un utente può eliminare inavvertitamente i dati nella destinazione e ora i dati sono persi. In questo caso, l'avvio del backfill per l'oggetto funge da "meccanismo di ripristino" perché tutti i dati vengono trasmessi in streaming nella destinazione in un'unica operazione. Di conseguenza, i dati vengono sincronizzati tra l'origine e la destinazione.

Prima di poter avviare il backfill per un oggetto di uno stream, devi recuperare le informazioni sull'oggetto.

Ogni oggetto ha un OBJECT_ID, che lo identifica in modo univoco. Utilizzi OBJECT_ID per avviare il backfill per lo stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Per saperne di più sull'utilizzo di gcloud per avviare il backfill per un oggetto del flusso, consulta la documentazione di Google Cloud SDK.

Arrestare il backfill per un oggetto di un flusso

Dopo aver avviato il backfill per un oggetto di un flusso, puoi arrestarlo per l'oggetto. Ad esempio, se un utente modifica uno schema di database, lo schema o i dati potrebbero essere danneggiati. Non vuoi che questo schema o questi dati vengano trasmessi in streaming alla destinazione, quindi interrompi il backfill per l'oggetto.

Puoi anche interrompere il backfill per un oggetto ai fini del bilanciamento del carico. Datastream può eseguire più backfill in parallelo. Questa operazione può aumentare il carico sull'origine. Se il carico è significativo, interrompi il backfill per ogni oggetto, quindi avvialo per gli oggetti uno alla volta.

Prima di poter interrompere il backfill per un oggetto di uno stream, devi inviare una richiesta per recuperare informazioni su tutti gli oggetti di uno stream. Ogni oggetto restituito ha un OBJECT_ID, che lo identifica in modo univoco. Utilizza OBJECT_ID per interrompere il backfill per lo stream.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Per saperne di più sull'utilizzo di gcloud per interrompere il backfill per un oggetto del flusso, consulta la documentazione di Google Cloud SDK.

Modificare il numero massimo di attività CDC simultanee

Il seguente codice mostra come impostare il numero massimo di attività CDC (Change Data Capture) simultanee per un flusso MySQL su 7.

Per questo esempio, il campo specificato per il parametro updateMask è il campo maxConcurrentCdcTasks. Se imposti il valore su 7, modifichi il numero massimo di attività CDC simultanee dal valore precedente a 7. Puoi utilizzare valori compresi tra 0 e 50 (inclusi). Se non definisci il valore o se lo definisci come 0, per lo stream viene impostato il valore predefinito del sistema di 5 attività.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Per saperne di più sull'utilizzo di gcloud, consulta la documentazione di Google Cloud SDK.

Modificare il numero massimo di attività di backfill simultanee

Il seguente codice mostra come impostare il numero massimo di attività di backfill simultanee per un flusso MySQL su 25.

Per questo esempio, il campo specificato per il parametro updateMask è il campo maxConcurrentBackfillTasks. Se imposti il valore su 25, modifichi il numero massimo di attività di backfill simultanee dal valore precedente a 25. Puoi utilizzare valori compresi tra 0 e 50 (inclusi). Se non definisci il valore o se lo definisci come 0, per lo stream viene impostato il valore predefinito del sistema pari a 16 attività.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Per saperne di più sull'utilizzo di gcloud, consulta la documentazione di Google Cloud SDK.

Abilita lo streaming di oggetti di grandi dimensioni per le origini Oracle

Puoi attivare lo streaming di oggetti di grandi dimensioni, come oggetti binari di grandi dimensioni (BLOB), oggetti di grandi dimensioni di tipo carattere (CLOB) e oggetti di grandi dimensioni di tipo carattere nazionale (NCLOB) per i flussi con origini Oracle. Il flag streamLargeObjects consente di includere oggetti di grandi dimensioni sia nei flussi nuovi che in quelli esistenti. Il flag è impostato a livello di stream, non devi specificare le colonne dei tipi di dati di oggetti di grandi dimensioni.

L'esempio seguente mostra come creare un flusso che consente di trasmettere in streaming oggetti di grandi dimensioni.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Per saperne di più sull'utilizzo di gcloud per aggiornare uno stream, consulta la documentazione di Google Cloud SDK.

Eliminazione di un flusso

Il seguente codice mostra una richiesta di eliminazione di uno stream.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Per ulteriori informazioni sull'utilizzo di gcloud per eliminare lo stream, consulta la documentazione di Google Cloud SDK.

Passaggi successivi