Vorlage „Datastream zu MySQL oder PostgreSQL (Stream)“

Die Vorlage „Datastream to SQL“ ist eine Streaming-Pipeline, die Datastream-Daten liest und in jede MySQL- oder PostgreSQL-Datenbank repliziert. Die Vorlage liest Daten aus Cloud Storage mithilfe von Pub/Sub-Benachrichtigungen und repliziert diese Daten in SQL-Replikattabellen.

Die Vorlage unterstützt die Datendefinitionssprache (DDL) nicht und erwartet, dass alle Tabellen bereits in der Datenbank vorhanden sind. Die Replikation verwendet zustandsorientierte Transformationen in Dataflow, um veraltete Daten zu filtern und für die Konsistenz von Daten zu sorgen. Wenn beispielsweise eine neuere Version einer Zeile bereits verarbeitet wurde, wird eine später ankommende Version dieser Zeile ignoriert. Die ausgeführte Datenbearbeitungssprache (DML) versucht, die Ziel- oder Quelldaten so gut wie möglich zu replizieren. Für die ausgeführten DML-Anweisungen gelten die folgenden Regeln:

  • Wenn ein Primärschlüssel vorhanden ist, verwenden Einfügungs- und Aktualisierungsvorgänge eine Upsert-Syntax (d. h. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Wenn Primärschlüssel vorhanden sind, werden Löschvorgänge als Lösch-DML repliziert.
  • Wenn kein Primärschlüssel vorhanden ist, werden sowohl Einfüge- als auch Aktualisierungsvorgänge in die Tabelle eingefügt.
  • Wenn keine Primärschlüssel vorhanden sind, werden Löschvorgänge ignoriert.

Wenn Sie die Oracle-zu-Postgres-Dienstprogramme verwenden, fügen Sie ROWID in SQL als Primärschlüssel hinzu, wenn keine vorhanden sind.

Pipelineanforderungen

  • Ein Datastream-Stream, der bereits Daten repliziert oder dafür bereit ist.
  • Cloud Storage Pub/Sub-Benachrichtigungen sind für die Datastream-Daten aktiviert.
  • Eine PostgreSQL-Datenbank wurde mit dem erforderlichen Schema konfiguriert.
  • Der Netzwerkzugriff zwischen Dataflow-Workern und PostgreSQL ist eingerichtet.

Vorlagenparameter

Erforderliche Parameter

  • inputFilePattern : Der Dateispeicherort für die Datastream-Dateien in Cloud Storage, die repliziert werden sollen. Dieser Dateispeicherort ist normalerweise der Stammpfad für den Stream.
  • databaseHost : Der SQL-Host, auf dem eine Verbindung hergestellt werden soll.
  • databaseUser : Der SQL-Nutzer mit allen erforderlichen Berechtigungen zum Schreiben in alle Tabellen in der Replikation.
  • databasePassword : Das Passwort für den SQL-Nutzer.

Optionale Parameter

  • gcsPubSubSubscription : Das Pub/Sub-Abo mit Datastream-Dateibenachrichtigungen. Beispiel: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat : Das Format der von Datastream generierten Ausgabedatei. Beispiel: avrooder json Die Standardeinstellung ist avro.
  • streamName : Der Name oder die Vorlage für den Stream, der nach Schemainformationen abgefragt wird. Der Standardwert ist {_metadata_stream}.
  • rfcStartDateTime : Die Start-DateTime, die für den Abruf aus Cloud Storage verwendet wird (https://tools.ietf.org/html/rfc3339) Die Standardeinstellung ist: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl : Stamm-URL der Datastream API. Die Standardeinstellung ist https://datastream.googleapis.com/.
  • databaseType : Der Datenbanktyp, in den geschrieben werden soll (z. B. Postgres). Die Standardeinstellung ist "postgres".
  • databasePort : Der SQL-Datenbankport, zu dem eine Verbindung hergestellt werden soll. Der Standardwert ist 5432.
  • databaseName : Der Name der SQL-Datenbank, zu der eine Verbindung hergestellt werden soll. Der Standardwert ist postgres.
  • schemaMap : Eine Zuordnung von Schlüssel/Wert-Paaren, die zur Festlegung von Änderungen des Schemanamens verwendet werden (d. h. old_name:new_name,CaseError:case_error). Die Standardeinstellung ist leer.
  • customConnectionString : Optionaler Verbindungsstring, der anstelle des Standarddatenbankstrings verwendet wird.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Datastream to SQL templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: Ihre SQL-Host-IP-Adresse
  • DATABASE_USER: Ihr SQL-Nutzer
  • DATABASE_PASSWORD: Ihr SQL-Passwort

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: Ihre SQL-Host-IP-Adresse
  • DATABASE_USER: Ihr SQL-Nutzer
  • DATABASE_PASSWORD: Ihr SQL-Passwort

Nächste Schritte