Die Vorlage „Datastream to SQL“ ist eine Streaming-Pipeline, die Datastream-Daten liest und in jede MySQL- oder PostgreSQL-Datenbank repliziert. Die Vorlage liest Daten aus Cloud Storage mithilfe von Pub/Sub-Benachrichtigungen und repliziert diese Daten in SQL-Replikattabellen.
Die Vorlage unterstützt die Datendefinitionssprache (DDL) nicht und erwartet, dass alle Tabellen bereits in der Datenbank vorhanden sind. Die Replikation verwendet zustandsorientierte Transformationen in Dataflow, um veraltete Daten zu filtern und für die Konsistenz von Daten zu sorgen. Wenn beispielsweise eine neuere Version einer Zeile bereits verarbeitet wurde, wird eine später ankommende Version dieser Zeile ignoriert. Die ausgeführte Datenbearbeitungssprache (DML) versucht, die Ziel- oder Quelldaten so gut wie möglich zu replizieren. Für die ausgeführten DML-Anweisungen gelten die folgenden Regeln:
- Wenn ein Primärschlüssel vorhanden ist, verwenden Einfügungs- und Aktualisierungsvorgänge eine Upsert-Syntax (d. h.
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE
). - Wenn Primärschlüssel vorhanden sind, werden Löschvorgänge als Lösch-DML repliziert.
- Wenn kein Primärschlüssel vorhanden ist, werden sowohl Einfüge- als auch Aktualisierungsvorgänge in die Tabelle eingefügt.
- Wenn keine Primärschlüssel vorhanden sind, werden Löschvorgänge ignoriert.
Wenn Sie die Oracle-zu-Postgres-Dienstprogramme verwenden, fügen Sie ROWID
in SQL als Primärschlüssel hinzu, wenn keine vorhanden sind.
Pipelineanforderungen
- Ein Datastream-Stream, der bereits Daten repliziert oder dafür bereit ist.
- Cloud Storage Pub/Sub-Benachrichtigungen sind für die Datastream-Daten aktiviert.
- Eine PostgreSQL-Datenbank wurde mit dem erforderlichen Schema konfiguriert.
- Der Netzwerkzugriff zwischen Dataflow-Workern und PostgreSQL ist eingerichtet.
Vorlagenparameter
Erforderliche Parameter
- inputFilePattern: Der Speicherort für die Datastream-Dateien in Cloud Storage, die repliziert werden sollen. Dieser Dateispeicherort ist normalerweise der Stammpfad für den Stream.
- databaseHost: Der SQL-Host, auf dem eine Verbindung hergestellt werden soll.
- databaseUser: Der SQL-Nutzer mit allen erforderlichen Berechtigungen zum Schreiben in alle Tabellen in der Replikation.
- databasePassword: Das Passwort für den SQL-Nutzer.
Optionale Parameter
- gcsPubSubSubscription: Das Pub/Sub-Abo mit Benachrichtigungen zu Datastream-Dateien. Beispiel:
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>
. - inputFileFormat: Das Format der von Datastream generierten Ausgabedatei. Beispiel:
avro
oderjson
Die Standardeinstellung istavro
. - streamName: Der Name oder die Vorlage für den Stream, der nach Schemainformationen abgefragt wird. Der Standardwert ist
{_metadata_stream}
. - rfcStartDateTime: Das Startdatum, das zum Abrufen von Daten aus Cloud Storage verwendet werden soll (https://tools.ietf.org/html/rfc3339). Die Standardeinstellung ist: 1970-01-01T00:00:00.00Z.
- dataStreamRootUrl: Stamm-URL der Datastream API. Standardeinstellung: https://datastream.googleapis.com/.
- databaseType: Der Datenbanktyp, in den geschrieben werden soll (z. B. Postgres). Standardeinstellung: „postgres“.
- databasePort: Der SQL-Datenbankport, zu dem eine Verbindung hergestellt werden soll. Der Standardwert ist
5432
. - databaseName: Der Name der SQL-Datenbank, zu der eine Verbindung hergestellt werden soll. Der Standardwert ist
postgres
. - schemaMap: Eine Zuordnung von Schlüsseln/Werten, die zum Festlegen von Änderungen am Schemanamen verwendet wird (z. B. „alter_name:new_name“, „CaseError:case_error“). Die Standardeinstellung ist leer.
- customConnectionString: Optionaler Verbindungsstring, der anstelle des Standarddatenbankstrings verwendet wird.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Datastream to SQL templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel:projects/my-project-id/subscriptions/my-subscription-id
DATABASE_HOST
: Ihre SQL-Host-IP-AdresseDATABASE_USER
: Ihr SQL-NutzerDATABASE_PASSWORD
: Ihr SQL-Passwort
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL", } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel:projects/my-project-id/subscriptions/my-subscription-id
DATABASE_HOST
: Ihre SQL-Host-IP-AdresseDATABASE_USER
: Ihr SQL-NutzerDATABASE_PASSWORD
: Ihr SQL-Passwort
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.