Vorlage "Datastream zu Spanner"

Die Vorlage "Datastream für Spanner" ist eine Streamingpipeline, die Datastream-Ereignisse aus einem Cloud Storage-Bucket liest und in eine Spanner-Datenbank schreibt. Sie ist für die Datenmigration von Datastream-Quellen zu Spanner vorgesehen.

Alle für die Migration erforderlichen Tabellen müssen vor der Ausführung der Vorlage in der Spanner-Zieldatenbank vorhanden sein. Daher muss die Schemamigration von einer Quelldatenbank zum Spanner-Ziel abgeschlossen sein, bevor Sie Daten migrieren können. Daten können in den Tabellen vor der Migration vorhanden sein. Diese Vorlage leitet keine Änderungen des Datastream-Schemas an die Spanner-Datenbank weiter.

Die Datenkonsistenz wird erst am Ende der Migration garantiert, wenn alle Daten in Spanner geschrieben wurden. Zum Speichern von Reihenfolgeinformationen für jeden in Spanner geschriebenen Datensatz erstellt diese Vorlage eine zusätzliche Tabelle (sogenannte Schattentabelle) für jede Tabelle in der Spanner-Datenbank. Dadurch wird die Konsistenz am Ende der Migration sichergestellt. Die Schattentabellen werden nach der Migration nicht gelöscht und können am Ende der Migration zur Validierung verwendet werden.

Alle Fehler, die während des Vorgangs auftreten, z. B. nicht übereinstimmende Schemas, fehlerhafte JSON-Dateien oder Fehler, die sich aus der Ausführung von Transformationen ergeben, werden in einer Fehlerwarteschlange aufgezeichnet. Die Fehlerwarteschlange ist ein Cloud Storage-Ordner, in dem alle Datastream-Ereignisse gespeichert werden, bei denen Fehler aufgetreten sind, zusammen mit der Fehlerursache im Textformat. Die Fehler können vorübergehend oder dauerhaft sein und in den entsprechenden Cloud Storage-Ordnern in der Fehlerwarteschlange gespeichert werden. Bei diesen vorübergehenden Fehler erfolgt automatisch eine Wiederholung, bei dauerhaften Fehlern dagegen nicht. Bei dauerhaften Fehlern haben Sie die Möglichkeit, Korrekturen an den Änderungsereignissen vorzunehmen und diese in den Bucket für Wiederholungen zu verschieben, während die Vorlage ausgeführt wird.

Pipelineanforderungen

  • Ein Datastream-Stream mit dem Status Wird ausgeführt oder Nicht gestartet.
  • Ein Cloud Storage-Bucket, in dem Datastream-Ereignisse repliziert werden.
  • Spanner-Datenbank mit vorhandenen Tabellen. Diese Tabellen können leer sein oder Daten enthalten.

Vorlagenparameter

Erforderliche Parameter

  • instanceId: Die Spanner-Instanz, in der die Änderungen repliziert werden.
  • databaseId: Die Spanner-Datenbank, in der die Änderungen repliziert werden.

Optionale Parameter

  • inputFilePattern: Der Cloud Storage-Dateispeicherort, der die zu replizierenden Datastream-Dateien enthält. In der Regel ist dies der Stammpfad für einen Stream. Die Unterstützung für diese Funktion wurde deaktiviert.
  • inputFileFormat: Das Format der von Datastream generierten Ausgabedatei. Beispiel: avro,json. Die Standardeinstellung ist avro.
  • sessionFilePath: Pfad der Sitzungsdatei in Cloud Storage, die Zuordnungsinformationen von HarbourBridge enthält.
  • projectId: Die Spanner-Projekt-ID.
  • spannerHost: Der Cloud Spanner-Endpunkt, der in der Vorlage aufgerufen werden soll. Beispiel: https://batch-spanner.googleapis.com. Die Standardeinstellung ist https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription: Das Pub/Sub-Abo, das in einer Cloud Storage-Benachrichtigungsrichtlinie verwendet wird. Verwenden Sie für den Namen das Format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName: Der Name oder die Vorlage für den Stream, der nach Schemainformationen und dem Quelltyp abgefragt werden soll.
  • shadowTablePrefix: Das Präfix zum Benennen von Schattentabellen. Standardeinstellung: shadow_
  • shouldCreateShadowTables: Dieses Flag gibt an, ob Schattentabellen in der Cloud Spanner-Datenbank erstellt werden müssen. Die Standardeinstellung ist true.
  • rfcStartDateTime: Das Startdatum, das zum Abrufen von Daten aus Cloud Storage verwendet werden soll (https://tools.ietf.org/html/rfc3339). Die Standardeinstellung ist: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: Die Anzahl der gleichzeitig zu lesenden DataStream-Dateien. Die Standardeinstellung ist 30.
  • deadLetterQueueDirectory: Der Dateipfad, der beim Speichern der Fehlerwarteschlangenausgabe verwendet wird. Der Standard-Dateipfad ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs.
  • dlqRetryMinutes: Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Die Standardeinstellung ist 10.
  • dlqMaxRetryCount: Die maximale Anzahl der Wiederholungsversuche über die DLQ bei vorübergehenden Fehlern. Die Standardeinstellung ist 500.
  • dataStreamRootUrl: Stamm-URL der Datastream API. Standardeinstellung: https://datastream.googleapis.com/.
  • datastreamSourceType: Dies ist der Typ der Quelldatenbank, zu der Datastream eine Verbindung herstellt. Beispiel: mysql/oracle. Muss festgelegt werden, wenn ohne einen tatsächlich laufenden Datastream getestet wird.
  • roundJsonDecimals: Wenn dieses Flag gesetzt ist, werden die Dezimalwerte in JSON-Spalten auf eine Zahl gerundet, die ohne Genauigkeitsverlust gespeichert werden kann. Die Standardeinstellung ist "false".
  • runMode: Dies ist der Ausführungsmodus, entweder „normal“ oder „mit retryDLQ“. Standardeinstellung: „normal“
  • transformationContextFilePath: Pfad der Transformationskontextdatei im Cloud-Speicher, der zum Ausfüllen von Daten verwendet wird, die für bei Migrationen durchgeführten Transformationen verwendet werden. Beispiel: Shard-ID zu Datenbankname, um die Datenbank zu identifizieren, aus der eine Zeile migriert wurde.
  • directoryWatchDurationInMinutes: Die Dauer, für die die Pipeline ein Verzeichnis in GCS zyklisch abfragen soll. Die Datenstream-Ausgabedateien sind in einer Verzeichnisstruktur angeordnet, die Ereignis-Zeitstempel nach Minuten gruppiert. Dieser Parameter sollte ungefähr der maximalen Verzögerung entsprechen, die zwischen dem Auftreten eines Ereignisses in der Quelldatenbank und dem Schreiben desselben Ereignisses durch Datastream in GCS auftreten kann. 99,9 Perzentil = 10 Minuten Die Standardeinstellung ist 10.
  • spannerPriority: Die Anfragepriorität für Cloud Spanner-Aufrufe. Der Wert muss einer der folgenden sein: [HIGH,MEDIUM,LOW]. Standardmäßig ist HIGH festgelegt.
  • dlqGcsPubSubSubscription: Das Pub/Sub-Abo, das in einer Cloud Storage-Benachrichtigungsrichtlinie für das DLQ-Wiederholverzeichnis verwendet wird, wenn der reguläre Modus genutzt wird. Verwenden Sie für den Namen das Format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Wenn diese Option festgelegt ist, werden „deadLetterQueueDirectory“ und „dlqRetryMinutes“ ignoriert.
  • transformationJarPath: Speicherort der benutzerdefinierten JAR-Datei in Cloud Storage, die die benutzerdefinierte Transformationslogik für die Verarbeitung von Einträgen bei der Vorwärts-Migration enthält. Die Standardeinstellung ist leer.
  • transformationClassName: Vollständig qualifizierter Klassenname mit der benutzerdefinierten Transformationslogik. Dieses Feld ist ein Pflichtfeld, wenn „transformationJarPath“ angegeben ist. Die Standardeinstellung ist leer.
  • transformationCustomParameters: String mit allen benutzerdefinierten Parametern, die an die benutzerdefinierte Transformationsklasse übergeben werden sollen. Die Standardeinstellung ist leer.
  • filteredEventsDirectory: Dies ist der Dateipfad, unter dem die über die benutzerdefinierte Transformation gefilterten Ereignisse gespeichert werden. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. Der Standardwert ist unter den meisten Bedingungen ausreichend.
  • shardingContextFilePath: Der Pfad der Sharding-Kontextdatei im Cloud-Speicher wird verwendet, um die Shard-ID in der Spanner-Datenbank für jeden Quell-Shard anzugeben.Er hat das Format Map<stream_name, Map<db_name, shard_id>>.
  • tableOverrides: Dies sind die Überschreibungen des Tabellennamens von der Quelle zu Spanner. Sie werden im folgenden Format geschrieben: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]. In diesem Beispiel wird die Tabelle „Sänger“ den Sängern und die Tabelle „Alben“ den Aufnahmen zugeordnet. Beispiel: [{Singers, Vocalists}, {Albums, Records}]. Die Standardeinstellung ist leer.
  • columnOverrides: Dies sind die Überschreibungen des Spaltennamens von der Quelle zu Spanner. Sie werden im folgenden Format geschrieben: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]. Die SourceTableName sollte sowohl im Quell- als auch im Spanner-Paar gleich bleiben. Verwenden Sie „tableOverrides“, um Tabellennamen zu überschreiben.Im Beispiel wird „SingerName“ in der Tabelle „Singers“ mit „TalentName“ und „AlbumName“ in der Tabelle „Albums“ mit „RecordName“ abgeglichen. Beispiel: [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]. Die Standardeinstellung ist leer.
  • schemaOverridesFilePath: Datei, in der die Überschreibungen des Tabellen- und Spaltennamens von der Quelle zu Spanner angegeben sind. Die Standardeinstellung ist leer.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Datastream to Spanner templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • GCS_FILE_PATH ist der Cloud Storage-Pfad, der zum Speichern von Datastream-Ereignissen verwendet wird. Beispiel: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Ihre Spanner-Instanz.
  • CLOUDSPANNER_DATABASE: Ihre Spanner-Datenbank
  • DLQ ist der Cloud Storage-Pfad für das Fehlerwarteschlangenverzeichnis.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • GCS_FILE_PATH ist der Cloud Storage-Pfad, der zum Speichern von Datastream-Ereignissen verwendet wird. Beispiel: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Ihre Spanner-Instanz.
  • CLOUDSPANNER_DATABASE: Ihre Spanner-Datenbank
  • DLQ ist der Cloud Storage-Pfad für das Fehlerwarteschlangenverzeichnis.

Nächste Schritte