Vorlage „Spanner-Änderungsstreams für Quelldatenbank“

Streamingpipeline Liest Daten aus Spanner-Änderungsstreams und schreibt sie in eine Quelle.

Vorlagenparameter

Parameter Beschreibung
changeStreamName Der Name des Spanner-Änderungsstreams, aus dem die Pipeline liest.
instanceId Der Name der Spanner-Instanz, in der sich der Änderungsstream befindet.
databaseId Der Name der Spanner-Datenbank, die vom Änderungsstream überwacht wird.
spannerProjectId Der Name des Spanner-Projekts.
metadataInstance Die Instanz, in der die Metadaten gespeichert werden, die vom Connector verwendet werden, um die Nutzung der Änderungsstream-API-Daten zu steuern.
metadataDatabase Die Datenbank zum Speichern der Metadaten, die vom Connector verwendet werden, um die Nutzung der Änderungsstream-API-Daten zu steuern.
sourceShardsFilePath Pfad zu einer Cloud Storage-Datei mit Informationen zum Verbindungsprofil für Quellshards.
startTimestamp Optional: Der Startzeitstempel für das Lesen von Änderungen. Die Standardeinstellung ist leer.
endTimestamp Optional: Der Endzeitstempel für das Lesen von Änderungen. Wenn kein Zeitstempel angegeben ist, wird der Wert unbegrenzt gelesen. Die Standardeinstellung ist leer.
shadowTablePrefix Optional: Das Präfix zum Benennen von Schattentabellen. Standardeinstellung: shadow_.
sessionFilePath Optional: Sitzungspfad in Cloud Storage, der Zuordnungsinformationen von HarbourBridge enthält.
filtrationMode Optional: Filtermodus. Gibt an, wie bestimmte Datensätze anhand eines bestimmten Kriteriums gelöscht werden sollen. Folgende Modi werden unterstützt: none (kein Filter), forward_migration (Filter für Datensätze, die mit der Pipeline für die vorwärtsgerichtete Migration geschrieben wurden). Die Standardeinstellung ist forward_migration.
shardingCustomJarPath Optional: Speicherort der benutzerdefinierten JAR-Datei in Cloud Storage, die die benutzerdefinierte Logik zum Abrufen der Shard-ID enthält. Wenn Sie diesen Parameter festlegen, müssen Sie auch den Parameter shardingCustomJarPath festlegen. Die Standardeinstellung ist leer.
shardingCustomClassName Optional: Vollständig qualifizierter Klassenname mit der benutzerdefinierten Shard-ID-Implementierung. Wenn shardingCustomJarPath angegeben ist, ist dieser Parameter erforderlich. Die Standardeinstellung ist leer.
shardingCustomParameters Optional: String mit allen benutzerdefinierten Parametern, die an die benutzerdefinierte Sharding-Klasse übergeben werden sollen. Die Standardeinstellung ist leer.
sourceDbTimezoneOffset Optional: Der Zeitzonen-Versatz von UTC für die Quelldatenbank. Beispielwert: +10:00. Die Standardeinstellung ist +00:00.
dlqGcsPubSubSubscription Optional: Das Pub/Sub-Abo, das in einer Cloud Storage-Benachrichtigungsrichtlinie für das DLQ-Wiederholverzeichnis verwendet wird, wenn der reguläre Modus genutzt wird. Der Name muss das Format projects/<project-id>/subscriptions/<subscription-name> haben. Wenn er festgelegt ist, werden „deadLetterQueueDirectory“ und „dlqRetryMinutes“ ignoriert.
skipDirectoryName Optional: Datensätze, die bei der umgekehrten Replikation übersprungen wurden, werden in dieses Verzeichnis geschrieben. Der Standardverzeichnisname ist „skip“.
maxShardConnections Optional: Die maximale Anzahl von Verbindungen, die ein bestimmter Shard akzeptieren kann. Die Standardeinstellung ist 10000.
deadLetterQueueDirectory Optional: Der Pfad, der beim Speichern der Fehlerwarteschlangenausgabe verwendet wird. Der Standardpfad ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs.
dlqMaxRetryCount Optional: Die maximale Anzahl der Wiederholungsversuche über die Dead-Letter-Queue bei vorübergehenden Fehlern. Die Standardeinstellung ist 500.
runMode Optional: Der Ausführungsmodus. Unterstützte Werte: regular, retryDLQ. Standardeinstellung: regular. Geben Sie retryDLQ an, um nur Einträge in der Dead-Letter-Queue mit schwerwiegenden Fehlern noch einmal zu senden.
dlqRetryMinutes Optional: Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Der Standardwert ist 10.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Spanner Change Streams to Source Database templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud-CLI

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • CHANGE_STREAM_NAME: Der Name des Änderungsstreams, aus dem gelesen werden soll
  • INSTANCE_ID: Die Cloud Spanner-Instanz-ID.
  • DATABASE_ID: die Cloud Spanner-Datenbank-ID
  • SPANNER_PROJECT_ID: die Cloud Spanner-Projekt-ID.
  • METADATA_INSTANCE: Die Cloud Spanner-Instanz, in der Metadaten beim Lesen aus Änderungsstreams gespeichert werden sollen
  • METADATA_DATABASE: die Cloud Spanner-Datenbank zum Speichern von Metadaten beim Lesen aus Änderungsstreams
  • SOURCE_SHARDS_FILE_PATH: Pfad zur GCS-Datei mit den Details zum Quellshard

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • CHANGE_STREAM_NAME: Der Name des Änderungsstreams, aus dem gelesen werden soll
  • INSTANCE_ID: Die Cloud Spanner-Instanz-ID.
  • DATABASE_ID: die Cloud Spanner-Datenbank-ID
  • SPANNER_PROJECT_ID: die Cloud Spanner-Projekt-ID.
  • METADATA_INSTANCE: Die Cloud Spanner-Instanz, in der Metadaten beim Lesen aus Änderungsstreams gespeichert werden sollen
  • METADATA_DATABASE: die Cloud Spanner-Datenbank zum Speichern von Metadaten beim Lesen aus Änderungsstreams
  • SOURCE_SHARDS_FILE_PATH: Pfad zur GCS-Datei mit den Details zum Quellshard