Vorlage „Datastream zu MySQL oder PostgreSQL (Stream)“

Die Vorlage „Datastream to SQL“ ist eine Streaming-Pipeline, die Datastream-Daten liest und in jede MySQL- oder PostgreSQL-Datenbank repliziert. Die Vorlage liest Daten aus Cloud Storage mithilfe von Pub/Sub-Benachrichtigungen und repliziert diese Daten in SQL-Replikattabellen.

Die Vorlage unterstützt die Datendefinitionssprache (DDL) nicht und erwartet, dass alle Tabellen bereits in der Datenbank vorhanden sind. Die Replikation verwendet zustandsorientierte Transformationen in Dataflow, um veraltete Daten zu filtern und für die Konsistenz von Daten zu sorgen. Wenn beispielsweise eine neuere Version einer Zeile bereits verarbeitet wurde, wird eine später ankommende Version dieser Zeile ignoriert. Die ausgeführte Datenbearbeitungssprache (DML) versucht, die Ziel- oder Quelldaten so gut wie möglich zu replizieren. Für die ausgeführten DML-Anweisungen gelten die folgenden Regeln:

  • Wenn ein Primärschlüssel vorhanden ist, verwenden Einfügungs- und Aktualisierungsvorgänge eine Upsert-Syntax (d. h. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Wenn Primärschlüssel vorhanden sind, werden Löschvorgänge als Lösch-DML repliziert.
  • Wenn kein Primärschlüssel vorhanden ist, werden sowohl Einfüge- als auch Aktualisierungsvorgänge in die Tabelle eingefügt.
  • Wenn keine Primärschlüssel vorhanden sind, werden Löschvorgänge ignoriert.

Wenn Sie die Oracle-zu-Postgres-Dienstprogramme verwenden, fügen Sie ROWID in SQL als Primärschlüssel hinzu, wenn keine vorhanden sind.

Pipelineanforderungen

  • Ein Datastream-Stream, der bereits Daten repliziert oder dafür bereit ist.
  • Cloud Storage Pub/Sub-Benachrichtigungen sind für die Datastream-Daten aktiviert.
  • Eine PostgreSQL-Datenbank wurde mit dem erforderlichen Schema konfiguriert.
  • Der Netzwerkzugriff zwischen Dataflow-Workern und PostgreSQL ist eingerichtet.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Der Speicherort für Datastream-Dateien in Cloud Storage, die repliziert werden sollen. Dieser Dateispeicherort ist normalerweise der Stammpfad für den Stream.
gcsPubSubSubscription Das Pub/Sub-Abo mit Datastream-Dateibenachrichtigungen, z. B. projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat Das Format der von Datastream generierten Ausgabedatei. Beispiel: avro,json Standardeinstellung: avro.
databaseHost Der SQL-Host, auf dem eine Verbindung hergestellt werden soll.
databaseUser Der SQL-Nutzer mit allen erforderlichen Berechtigungen zum Schreiben in alle Tabellen in der Replikation.
databasePassword Das Passwort für den jeweiligen SQL-Nutzer.
databasePort (Optional) Der SQL-Datenbankport, zu dem eine Verbindung hergestellt werden soll. Standardeinstellung: 5432.
databaseName (Optional) Der Name der SQL-Datenbank, zu der eine Verbindung hergestellt werden soll. Standardeinstellung: "postgres".
streamName (Optional) Der Name oder die Vorlage für den Stream, der nach Schemainformationen abgefragt wird. Standardeinstellung: {_metadata_stream}.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Datastream to SQL templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: Ihre SQL-Host-IP-Adresse
  • DATABASE_USER: Ihr SQL-Nutzer
  • DATABASE_PASSWORD: Ihr SQL-Passwort

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: Ihre SQL-Host-IP-Adresse
  • DATABASE_USER: Ihr SQL-Nutzer
  • DATABASE_PASSWORD: Ihr SQL-Passwort

Nächste Schritte