Auf dieser Seite erfahren Sie, wie Sie die Datastream API für folgende Aufgaben verwenden:
- Streams erstellen
- Informationen zu Streams und Streamobjekten abrufen
- Streams aktualisieren – durch Starten, Anhalten, Fortsetzen, Ändern sowie Initiieren und Beenden eines Backfills für Streamobjekte
- Dauerhaft fehlgeschlagene Streams wiederherstellen
- Streaming großer Objekte für Oracle-Streams aktivieren
- Streams löschen
Sie können die Datastream API auf zwei Arten verwenden. Sie können REST API-Aufrufe ausführen oder die Google Cloud CLI verwenden.
Allgemeine Informationen zur Verwendung von gcloud
für die Verwaltung von Datastream-Streams finden Sie unter gcloud Datastream-Streams.
Stream erstellen
In diesem Abschnitt erfahren Sie, wie Sie einen Stream erstellen, mit dem Daten von der Quelle an ein Ziel übertragen werden. Die folgenden Beispiele sind nicht vollständig, sondern sollen bestimmte Funktionen von Datastream hervorheben. Verwenden Sie diese Beispiele zusammen mit der Referenzdokumentation der Datastream API, um Ihren spezifischen Anwendungsfall zu erfüllen.
In diesem Abschnitt werden die folgenden Anwendungsfälle behandelt:
- Streams von Oracle zu Cloud Storage
- Von MySQL zu BigQuery streamen
- Von PostgreSQL zu BigQuery streamen
- Objekte definieren, die in den Stream aufgenommen werden sollen
- Alle im Stream enthaltenen Objekte per Backfill einfügen
- Objekte aus dem Stream ausschließen
- Objekte vom Backfill ausschließen
- CMEK für die Verschlüsselung inaktiver Daten definieren
- Schreibmodus für einen Stream definieren
Beispiel 1: Bestimmte Objekte in BigQuery streamen
In diesem Beispiel erfahren Sie, wie Sie Folgendes tun:
- Von MySQL zu BigQuery streamen
- Eine Reihe von Objekten in den Stream aufnehmen
- Schreibmodus für den Stream als „Nur anhängen“ definieren
- Backfill für alle im Stream enthaltenen Objekte
Im Folgenden sehen Sie eine Anfrage zum Abrufen aller Tabellen aus schema1
und zwei bestimmter Tabellen aus schema2
: tableA
und tableC
. Die Ereignisse werden in ein Dataset in BigQuery geschrieben.
Die Anfrage enthält nicht den Parameter customerManagedEncryptionKey
. Daher wird das interne Schlüsselverwaltungssystem Google Cloud anstelle von CMEK zum Verschlüsseln Ihrer Daten verwendet.
Der Parameter backfillAll
, der mit dem Backfill (oder Snapshot) der Verlaufsdaten verknüpft ist, ist auf ein leeres Wörterbuch ({}
) gesetzt. Das bedeutet, dass Datastream die Verlaufsdaten aus allen Tabellen im Stream zurücksetzt.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Weitere Informationen zum Erstellen eines Streams mit gcloud
finden Sie in der Google Cloud SDK-Dokumentation.
Beispiel 2: Bestimmte Objekte aus einem Stream mit einer PostgreSQL-Quelle ausschließen
In diesem Beispiel erfahren Sie, wie Sie Folgendes tun:
- Von PostgreSQL zu BigQuery streamen
- Objekte aus dem Stream ausschließen
- Objekte vom Backfill ausschließen
Der folgende Code zeigt eine Anfrage zum Erstellen eines Streams, der zum Übertragen von Daten aus einer PostgreSQL-Quelldatenbank in BigQuery verwendet wird. Wenn Sie einen Stream aus einer PostgreSQL-Quellendatenbank erstellen, müssen Sie in Ihrer Anfrage zwei zusätzliche, PostgreSQL-spezifische Felder angeben:
replicationSlot
: Ein Replikationsslot ist eine Voraussetzung für die Konfiguration einer PostgreSQL-Datenbank für die Replikation. Sie müssen für jeden Stream einen Replikationsslot erstellen.publication
: Eine Publikation ist eine Gruppe von Tabellen, aus denen Sie Änderungen replizieren möchten. Der Name der Publikation muss in der Datenbank vorhanden sein, bevor ein Stream gestartet werden kann. Die Publikation muss mindestens die Tabellen enthalten, die in derincludeObjects
-Liste des Streams angegeben sind.
Der Parameter backfillAll
, der mit dem Ausführen des Backfills (oder Snapshots) von Verlaufsdaten verknüpft ist, ist so festgelegt, dass eine Tabelle ausgeschlossen wird.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Weitere Informationen zum Erstellen eines Streams mit gcloud
finden Sie in der Google Cloud SDK-Dokumentation.
Beispiel 3: Schreibmodus „Nur anhängen“ für einen Stream angeben
Beim Streaming zu BigQuery können Sie den Schreibmodus definieren: merge
oder appendOnly
. Weitere Informationen finden Sie unter Schreibmodus konfigurieren.
Wenn Sie in Ihrer Anfrage zum Erstellen eines Streams keinen Schreibmodus angeben, wird der Standardmodus merge
verwendet.
In der folgenden Anfrage wird gezeigt, wie Sie den appendOnly
-Modus definieren, wenn Sie einen MySQL-zu-BigQuery-Stream erstellen.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Weitere Informationen zum Erstellen eines Streams mit gcloud
finden Sie in der Google Cloud SDK-Dokumentation.
Beispiel 4: Streaming zu einem Cloud Storage-Ziel
In diesem Beispiel erfahren Sie, wie Sie Folgendes tun:
- Von Oracle zu Cloud Storage streamen
- Objekte definieren, die in den Stream aufgenommen werden sollen
- CMEK für die Verschlüsselung inaktiver Daten definieren
In der folgenden Anfrage wird gezeigt, wie ein Stream erstellt wird, der die Ereignisse in einen Bucket in Cloud Storage schreibt.
In dieser Beispielanfrage werden die Ereignisse im JSON-Ausgabeformat geschrieben und alle 100 MB oder 30 Sekunden wird eine neue Datei erstellt. Dabei werden die Standardwerte von 50 MB und 60 Sekunden überschrieben.
Für das JSON-Format können Sie:
Fügen Sie eine Unified Types-Schemadatei in den Pfad ein. Dadurch schreibt Datastream zwei Dateien in Cloud Storage: eine JSON-Datendatei und eine Avro-Schemadatei. Die Schemadatei hat denselben Namen wie die Datendatei, aber mit der Erweiterung
.schema
.gzip-Komprimierung aktivieren, damit Datastream die Dateien komprimiert, die in Cloud Storage geschrieben werden.
Mit dem Parameter backfillNone
wird in der Anfrage angegeben, dass nur laufende Änderungen ohne Backfill an das Ziel gestreamt werden.
In der Anfrage wird der Parameter „vom Kunden verwalteter Verschlüsselungsschlüssel“ angegeben, mit dem Sie die Schlüssel steuern können, die zum Verschlüsseln inaktiver Daten in einem Google Cloud -Projekt verwendet werden. Der Parameter bezieht sich auf den CMEK, mit dem Datastream Daten verschlüsselt, die von der Quelle an das Ziel gestreamt werden. Außerdem wird der Schlüsselbund für den CMEK-Schlüssel angegeben.
Weitere Informationen zu Schlüsselringen finden Sie unter Cloud KMS-Ressourcen. Weitere Informationen zum Schutz Ihrer Daten mit Verschlüsselungsschlüsseln finden Sie unter Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Weitere Informationen zum Erstellen eines Streams mit gcloud
finden Sie in der Google Cloud SDK-Dokumentation.
Definition eines Streams validieren
Bevor Sie einen Stream erstellen, können Sie seine Definition validieren. So können Sie dafür sorgen, dass alle Validierungsprüfungen erfolgreich sind und der Stream beim Erstellen ordnungsgemäß ausgeführt wird.
Dabei wird Folgendes geprüft:
- Ob die Quelle ordnungsgemäß konfiguriert ist, damit Datastream Daten von ihr streamen kann.
- Ob er sowohl mit der Quelle als auch mit dem Ziel verbunden werden kann.
- Die End-to-End-Konfiguration des Streams.
Wenn du einen Stream validieren möchtest, füge der URL vor dem Anfragetext &validate_only=true
hinzu:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Nachdem Sie diese Anfrage gestellt haben, werden die Validierungsprüfungen angezeigt, die Datastream für Ihre Quelle und Ihr Ziel ausführt, und ob die Prüfungen bestanden werden oder fehlschlagen. Bei nicht bestandenen Validierungsprüfungen werden Informationen dazu angezeigt, warum die Prüfung fehlgeschlagen ist und wie sie das Problem beheben können.
Angenommen, Sie haben einen vom Kunden verwalteten Verschlüsselungsschlüssel (Customer-Managed Encryption Key, CMEK), mit dem Sie Daten verschlüsseln möchten, die von der Quelle zum Ziel gestreamt werden. Im Rahmen der Validierung des Streams prüft Datastream, ob der Schlüssel vorhanden ist und ob Datastream die Berechtigungen zur Verwendung des Schlüssels hat. Wenn eine dieser Bedingungen nicht erfüllt ist, wird beim Validieren des Streams die folgende Fehlermeldung zurückgegeben:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Prüfen Sie, ob der von Ihnen angegebene Schlüssel vorhanden ist und ob das Datastream-Dienstkonto die Berechtigung cloudkms.cryptoKeys.get
für den Schlüssel hat.
Nachdem Sie die entsprechenden Korrekturen vorgenommen haben, stellen Sie die Anfrage noch einmal, um sicherzustellen, dass alle Validierungsprüfungen bestanden wurden. Im obigen Beispiel gibt die CMEK_VALIDATE_PERMISSIONS
-Prüfung keine Fehlermeldung mehr zurück, sondern den Status PASSED
.
Informationen zu einem Stream abrufen
Der folgende Code zeigt eine Anfrage zum Abrufen von Informationen zu einem Stream. Zu diesen Daten gehören:
- Der Name des Streams (eindeutige Kennung)
- Ein nutzerfreundlicher Name für den Stream (Anzeigename)
- Zeitstempel für die Erstellung und letzte Aktualisierung des Streams
- Informationen zu den Quell- und Zielverbindungsprofilen, die mit dem Stream verknüpft sind
- Der Status des Streams
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
Die Antwort sieht so aus:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Abrufen von Informationen zu Ihrem Stream finden Sie hier.
Streams auflisten
Der folgende Code zeigt eine Anfrage zum Abrufen einer Liste aller Streams im angegebenen Projekt und an dem angegebenen Standort.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Abrufen von Informationen zu allen Streams finden Sie hier.
Objekte eines Streams auflisten
Der folgende Code zeigt eine Anfrage zum Abrufen von Informationen zu allen Objekten eines Streams.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Abrufen von Informationen zu allen Objekten in Ihrem Stream finden Sie hier.
Die Liste der zurückgegebenen Objekte sieht in etwa so aus:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Auflisten von Objekten eines Streams finden Sie hier.
Stream starten
Der folgende Code zeigt eine Anfrage zum Starten eines Streams.
Bei Verwendung des Parameters updateMask
in der Anfrage müssen nur die von Ihnen angegebenen Felder im Text der Anfrage enthalten sein. Wenn Sie einen Stream starten möchten, ändern Sie den Wert im Feld state
von CREATED
in RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Starten eines Streams finden Sie hier.
Stream pausieren
Der folgende Code zeigt eine Anfrage zum Anhalten eines ausgeführten Streams.
In diesem Beispiel ist für den Parameter updateMask
das Feld state
angegeben. Wenn Sie den Stream pausieren, ändern Sie seinen Status von RUNNING
in PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Pausieren von Streams finden Sie hier.
Stream fortsetzen
Der folgende Code zeigt eine Anfrage zum Fortsetzen eines angehaltenen Streams.
In diesem Beispiel ist für den Parameter updateMask
das Feld state
angegeben. Wenn Sie den Stream fortsetzen, ändern Sie seinen Status vonPAUSED
in RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Fortsetzen eines Streams finden Sie hier.
Stream wiederherstellen
Sie können einen dauerhaft fehlgeschlagenen Stream mit der Methode RunStream
wiederherstellen. Für jeden Quelldatenbanktyp gibt es eine eigene Definition der möglichen Streamwiederherstellungsvorgänge. Weitere Informationen finden Sie unter Stream wiederherstellen.
Stream für eine MySQL- oder Oracle-Quelle wiederherstellen
Die folgenden Codebeispiele zeigen Anfragen zum Wiederherstellen eines Streams für eine MySQL- oder Oracle-Quelle an verschiedenen Positionen in der Protokolldatei:
REST
Stream ab der aktuellen Position wiederherstellen Das ist die Standardoption:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Stream ab der nächsten verfügbaren Position wiederherstellen:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Stream von der letzten Position wiederherstellen:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Stream von einer bestimmten Position wiederherstellen (MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Ersetzen Sie Folgendes:
- NAME_OF_THE_LOG_FILE: Der Name der Protokolldatei, aus der Sie den Stream wiederherstellen möchten
- POSITION: Die Position in der Protokolldatei, von der aus Sie den Stream wiederherstellen möchten. Wenn Sie den Wert nicht angeben, stellt Datastream den Stream vom Anfang der Datei wieder her.
Beispiel:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Stream von einer bestimmten Position wiederherstellen (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Beispiel:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Weitere Informationen zu den verfügbaren Wiederherstellungsoptionen findest du unter Stream wiederherstellen.
gcloud
Das Wiederherstellen eines Streams mit gcloud
wird nicht unterstützt.
Stream für eine PostgreSQL-Quelle wiederherstellen
Das folgende Codebeispiel zeigt eine Anfrage zum Wiederherstellen eines Streams für eine PostgreSQL-Quelle. Während der Wiederherstellung beginnt der Stream mit dem Lesen bei der ersten Logsequenznummer (LSN) im für den Stream konfigurierten Replikationsslot.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Beispiel:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Wenn Sie den Replikationsslot ändern möchten, aktualisieren Sie zuerst den Stream mit dem neuen Namen des Replikationsslots:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
Das Wiederherstellen eines Streams mit gcloud
wird nicht unterstützt.
Stream für eine SQL Server-Quelle wiederherstellen
Die folgenden Codebeispiele zeigen Beispielanfragen zum Wiederherstellen eines Streams für eine SQL-Server-Quelle.
REST
Stream ab der ersten verfügbaren Position wiederherstellen:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Beispiel:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
So stellen Sie einen Stream anhand einer bevorzugten Logsequenznummer wieder her:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Beispiel:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
Das Wiederherstellen eines Streams mit gcloud
wird nicht unterstützt.
Stream an einer bestimmten Position starten oder fortsetzen
Bei MySQL- und Oracle-Quellen können Sie einen Stream starten oder einen pausierten Stream ab einer bestimmten Position fortsetzen. Das kann hilfreich sein, wenn Sie ein Backfill mit einem externen Tool ausführen oder die CDC von einer von Ihnen angegebenen Position aus starten möchten. Bei einer MySQL-Quelle müssen Sie eine Binlog-Position angeben, bei einer Oracle-Quelle eine Systemänderungsnummer (SCN) in der Redo-Protokolldatei.
Der folgende Code zeigt eine Anfrage zum Starten oder Fortsetzen eines bereits erstellten Streams von einer bestimmten Position aus.
Stream von einer bestimmten Binlog-Position starten oder fortsetzen (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Ersetzen Sie Folgendes:
- NAME_OF_THE_LOG_FILE: Der Name der Protokolldatei, von der aus Sie den Stream starten möchten.
- POSITION: Die Position in der Protokolldatei, von der aus Sie den Stream starten möchten. Wenn Sie keinen Wert angeben, beginnt Datastream mit dem Lesen am Anfang der Datei.
Beispiel:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
Das Starten oder Fortsetzen eines Streams an einer bestimmten Position mit gcloud
wird nicht unterstützt. Informationen zum Starten oder Fortsetzen eines Streams mit gcloud
finden Sie in der Cloud SDK-Dokumentation.
Stream ab einer bestimmten Systemänderungsnummer in der Redo-Logdatei (Oracle) starten oder fortsetzen:
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Beispiel:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
Das Starten oder Fortsetzen eines Streams an einer bestimmten Position mit gcloud
wird nicht unterstützt. Informationen zum Starten eines Streams mit gcloud
finden Sie in der Cloud SDK-Dokumentation.
Stream ändern
Der folgende Code zeigt eine Anfrage zum Aktualisieren der Dateirotationskonfiguration eines Streams, um die Datei alle 75 MB oder 45 Sekunden zu rotieren.
In diesem Beispiel werden für den Parameter updateMask
die Felder fileRotationMb
und fileRotationInterval
angegeben, die durch die Flags destinationConfig.gcsDestinationConfig.fileRotationMb
bzw. destinationConfig.gcsDestinationConfig.fileRotationInterval
dargestellt sind.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Im folgenden Code wird eine Anfrage zum Einfügen einer Unified Types-Schemadatei in den Pfad der Dateien gezeigt, die Datastream in Cloud Storage schreibt. Dadurch schreibt Datastream zwei Dateien: eine JSON-Datendatei und eine Avro-Schemadatei.
In diesem Beispiel ist das Feld jsonFileFormat
, dargestellt durch das Flag destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Der folgende Code zeigt eine Anfrage an Datastream, um vorhandene Daten sowie laufende Änderungen an den Daten aus der Quelldatenbank in das Ziel zu replizieren.
Im Abschnitt oracleExcludedObjects
des Codes werden die Tabellen und Schemas aufgeführt, die nicht für das Backfill in das Ziel verwendet werden dürfen.
Für dieses Beispiel werden alle Tabellen und Schemas außer tableA in schema3 ausgefüllt.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Ändern Ihres Streams finden Sie hier.
Backfill für ein Objekt eines Streams initiieren
Ein Stream in Datastream kann ein Backfill auf Verlaufsdaten ausführen und laufende Änderungen an ein Ziel streamen. Laufende Änderungen werden immer von einer Quelle an ein Ziel gestreamt. Sie können jedoch festlegen, ob Verlaufsdaten gestreamt werden sollen.
Wenn Sie möchten, dass Verlaufsdaten von der Quelle an das Ziel gestreamt werden, verwenden Sie den Parameter backfillAll
.
Mit Datastream können Sie auch Verlaufsdaten nur für bestimmte Datenbanktabellen streamen. Verwenden Sie dazu den Parameter backfillAll
und schließen Sie die Tabellen aus, für die Sie keine Verlaufsdaten abrufen möchten.
Wenn nur laufende Änderungen an das Ziel gestreamt werden sollen, verwenden Sie den Parameter backfillNone
. Wenn Sie möchten, dass Datastream einen Snapshot aller vorhandenen Daten von der Quelle zum Ziel streamt, müssen Sie den Backfill für die Objekte, die diese Daten enthalten, manuell initiieren.
Ein weiterer Grund zum Initiieren eines Backfills für ein Objekt ist, wenn die Daten zwischen der Quelle und dem Ziel nicht synchron sind. Beispielsweise kann ein Nutzer versehentlich Daten im Ziel löschen und die Daten gehen nun verloren. In diesem Fall dient das Initiieren des Backfills für das Objekt als "Rücksetzungsmechanismus", da alle Daten auf einmal an das Ziel gestreamt werden. Folglich werden die Daten zwischen der Quelle und dem Ziel synchronisiert.
Bevor Sie den Backfill für ein Objekt eines Streams initiieren können, müssen Sie Informationen zum Objekt abrufen.
Jedes Objekt hat einen OBJECT_ID, der das Objekt eindeutig identifiziert. Mit OBJECT_ID starten Sie den Backfill für den Stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Weitere Informationen dazu, wie du mit gcloud
einen Backfill für ein Objekt deines Streams initiierst, findest du in der Google Cloud SDK-Dokumentation.
Backfill für ein Objekt eines Streams beenden
Nach dem Initialisieren eines Backfills für ein Objekt eines Streams können Sie den Backfill für das Objekt beenden. Wenn ein Nutzer beispielsweise ein Datenbankschema ändert, können das Schema oder die Daten beschädigt sein. Da dieses Schema oder die Daten nicht an das Ziel gestreamt werden sollen, beenden Sie den Backfill für das Objekt.
Sie können den Backfill für ein Objekt auch zum Load-Balancing beenden. Datastream kann mehrere Backfills parallel ausführen. Dies kann eine zusätzliche Last für die Quelle bedeuten. Wenn die Last erheblich ist, beenden Sie den Backfill für jedes Objekt und initiieren dann den Backfill für die Objekte nacheinander.
Bevor Sie den Backfill für ein Objekt eines Streams beenden können, müssen Sie eine Anfrage zum Abrufen von Informationen zu allen Objekten eines Streams stellen. Jedes zurückgegebene Objekt hat einen OBJECT_ID, der das Objekt eindeutig identifiziert. Mit OBJECT_ID beenden Sie den Backfill für den Stream.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Weitere Informationen dazu, wie Sie mit gcloud
den Backfill für ein Objekt Ihres Streams beenden, finden Sie hier.
Maximale Anzahl gleichzeitiger CDC-Aufgaben ändern
Im folgenden Code wird gezeigt, wie die maximale Anzahl gleichzeitiger CDC-Aufgaben (Change Data Capture) für einen MySQL-Stream auf 7 festgelegt wird.
In diesem Beispiel ist für den Parameter updateMask
das Feld maxConcurrentCdcTasks
angegeben. Wenn Sie den Wert auf 7 festlegen, ändern Sie die Anzahl der maximalen gleichzeitigen CDC-Aufgaben vom vorherigen Wert auf 7. Sie können Werte zwischen 0 und 50 (einschließlich) verwenden. Wenn Sie den Wert nicht definieren oder auf 0 festlegen, wird für den Stream der Standardwert von 5 Aufgaben festgelegt.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Weitere Informationen zur Verwendung von gcloud
finden Sie hier.
Maximale Anzahl gleichzeitiger Backfill-Aufgaben ändern
Im folgenden Code wird gezeigt, wie die maximale Anzahl gleichzeitiger Backfill-Aufgaben für einen MySQL-Stream auf 25 festgelegt wird.
In diesem Beispiel ist für den Parameter updateMask
das Feld maxConcurrentBackfillTasks
angegeben. Wenn Sie den Wert auf 25 festlegen, ändern Sie die maximale Anzahl gleichzeitiger Backfill-Aufgaben vom vorherigen Wert auf 25. Sie können Werte zwischen 0 und 50 (einschließlich) verwenden. Wenn Sie den Wert nicht definieren oder auf 0 festlegen, wird für den Stream der Standardwert von 16 Aufgaben festgelegt.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Weitere Informationen zur Verwendung von gcloud
finden Sie hier.
Streaming großer Objekte für Oracle-Quellen aktivieren
Sie können das Streaming großer Objekte wie Binary Large Objects (BLOB
), Character Large Objects (CLOB
) und National Character Large Objects (NCLOB
) für Streams mit Oracle-Quellen aktivieren. Mit dem Flag streamLargeObjects
können Sie große Objekte sowohl in neue als auch in vorhandene Streams aufnehmen. Das Flag wird auf Streamebene festgelegt. Sie müssen die Spalten mit Large Object-Datentypen nicht angeben.
Im folgenden Beispiel wird gezeigt, wie Sie einen Stream erstellen, mit dem Sie große Objekte streamen können.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Weitere Informationen zum Aktualisieren eines Streams mit gcloud
finden Sie in der Google Cloud SDK-Dokumentation.
Stream löschen
Der folgende Code zeigt eine Anfrage zum Löschen eines Streams.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Weitere Informationen zur Verwendung von gcloud
zum Löschen eines Streams finden Sie hier.
Nächste Schritte
- Weitere Informationen zum Verwalten von Verbindungsprofilen mit der Datastream API
- Weitere Informationen zum Verwalten von Konfigurationen für private Verbindungen mit der Datastream API
- Weitere Informationen zur Verwendung der Datastream API finden Sie in der Referenzdokumentation.