Bigtable-Änderungsstreams zur Vorlage für die Vektorsuche

Diese Vorlage erstellt eine Streamingpipeline, um Bigtable-Datenänderungsdatensätze zu streamen und sie mit Dataflow Runner V2 in die Vertex AI Vektorsuche zu schreiben.

Pipelineanforderungen

  • Die Bigtable-Quellinstanz muss vorhanden sein.
  • Die Bigtable-Quelltabelle muss vorhanden sein und für die Tabelle müssen Änderungsstreams aktiviert sein.
  • Das Bigtable-Anwendungsprofil muss vorhanden sein.
  • Der Indexpfad für die Vektorsuche muss vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
embeddingColumn Der voll qualifizierte Spaltenname, in dem die Einbettungen gespeichert sind. Im Format cf:col.
embeddingByteSize Die Bytegröße jedes Eintrags im Einbettungsarray. Verwenden Sie 4 für Gleitkommazahl und 8 für Double. Die Standardeinstellung ist 4.
vectorSearchIndex Der Vektorsuchindex, in den Änderungen gestreamt werden, im Format "projects/{projectID}/locations/{region}/indexes/{indexID}" (keine voran- oder nachgestellten Leerzeichen). Beispiel: projects/123/locations/us-east1/indexes/456.
bigtableChangeStreamAppProfile Das Anwendungsprofil, das zur Unterscheidung von Arbeitslasten in Bigtable verwendet wird.
bigtableReadInstanceId Die ID der Bigtable-Instanz, die die Tabelle enthält.
bigtableReadTableId Die Bigtable-Tabelle, aus der gelesen werden soll.
bigtableMetadataTableTableId Optional: ID für die erstellte Metadatentabelle. Wenn nichts festgelegt ist, generiert Bigtable eine ID.
crowdingTagColumn Optional: Der voll qualifizierte Spaltenname, in dem das Crowding-Tag gespeichert ist, im Format cf:col.
allowRestrictsMappings Optional: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als allow-Einschränkungen verwendet werden sollen, sowie ihre Aliasse. Jeder Spaltenname muss das Format cf:col->alias haben.
denyRestrictsMappings Optional: Die durch Kommas getrennten, voll qualifizierten Spaltennamen der Spalten, die für deny-Einschränkungen verwendet werden sollen, mit ihren Aliassen. Jeder Spaltenname muss das Format cf:col->alias haben.
intNumericRestrictsMappings Optional: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als Ganzzahl numeric_restricts verwendet werden sollen, mit ihren Aliassen. Jeder Spaltenname muss das Format cf:col->alias haben.
floatNumericRestrictsMappings Optional: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als Gleitkommazahl (4 Byte) numeric_restricts verwendet werden sollen, mit ihren Aliassen. Jeder Spaltenname muss das Format cf:col->alias haben
doubleNumericRestrictsMappings Optional: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als Double (8 Byte) numeric_restricts verwendet werden sollen, mit ihren Aliassen. Jeder Spaltenname muss das Format cf:col->alias haben.
upsertMaxBatchSize Optional: Die maximale Anzahl von Upserts zum Zwischenspeichern, bevor der Batch auf den Vektorsuchindex aktualisiert wird. Batches werden gesendet, wenn entweder upsertBatchSize-Datensätze bereit sind. Beispiel: 10.
upsertMaxBufferDuration Optional: Die maximale Verzögerung, bevor ein Batch von Upserts an die Vektorsuche gesendet wird. Batches werden gesendet, wenn entweder upsertBatchSize-Datensätze bereit sind. Zulässige Formate sind: Ns für Sekunden (Beispiel: 5s), Nm für Minuten (Beispiel: 12m) und Nh für Stunden (Beispiel: 2h). Standard: 10s.
deleteMaxBatchSize Optional: Die maximale Anzahl von Löschvorgängen, die zwischengespeichert werden sollen, bevor der Batch aus dem Vektorsuchindex gelöscht wird. Batches werden gesendet, wenn entweder deleteBatchSize-Datensätze bereit sind. Beispiel: 10
deleteMaxBufferDuration Optional: Die maximale Verzögerung, bevor ein Batch von Löschvorgängen an die Vektorsuche gesendet wird. Batches werden gesendet, wenn entweder deleteBatchSize-Datensätze bereit sind. Zulässige Formate sind: Ns für Sekunden (Beispiel: 5s), Nm für Minuten (Beispiel: 12m) und Nh für Stunden (Beispiel: 2h). Standard: 10s.
dlqDirectory Optional: Der Dateipfad zum Speichern nicht verarbeiteter Einträge mit einer Begründung für die fehlerhafte Verarbeitung. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. Der Standardwert ist für die meisten Szenarien geeignet.
bigtableChangeStreamMetadataInstanceId Optional: Die Bigtable-Instanz, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll. Die Standardeinstellung ist leer.
bigtableChangeStreamMetadataTableTableId Optional: Die Bigtable-Änderung streamt die Metadatentabellen-ID des Connectors, die verwendet werden sollen. Wenn nicht angegeben, wird während des Pipelineablaufs automatisch eine Metadatentabelle für Bigtable-Änderungsstreams erstellt. Die Standardeinstellung ist leer.
bigtableChangeStreamCharset Optional: Die Bigtable-Änderungsstreams codieren den Zeichensatznamen beim Lesen von Werten und Spaltenqualifizierern. Die Standardeinstellung ist UTF-8.
bigtableChangeStreamStartTimestamp Optional: Die Start-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird (https://tools.ietf.org/html/rfc3339). Beispiel: 2022-05-05T07:59:59Z Die Standardeinstellung ist der Zeitstempel für den Start der Pipeline.
bigtableChangeStreamIgnoreColumnFamilies Optional: Eine durch Kommas getrennte Liste von Änderungen an den Namen von Spaltenfamilien, die nicht erfasst werden. Die Standardeinstellung ist leer.
bigtableChangeStreamIgnoreColumns Optional: Eine durch Kommas getrennte Liste von Änderungen an Spaltennamen, die nicht erfasst werden. Die Standardeinstellung ist leer.
bigtableChangeStreamName Optional: Ein eindeutiger Name für die Clientpipeline. Mit diesem Parameter wird die Verarbeitung ab dem Punkt fortgesetzt, an dem eine zuvor ausgeführte Pipeline beendet wurde. Standardmäßig wird ein automatisch generierter Name verwendet. Den verwendeten Wert finden Sie in den Dataflow-Joblogs.
bigtableChangeStreamResume

Optional: Wenn dieser Wert auf "true" gesetzt ist, setzt eine neue Pipeline die Verarbeitung ab dem Punkt fort, an dem eine zuvor ausgeführte Pipeline mit demselben Namen beendet wurde. Wenn eine Pipeline mit diesem Namen in der Vergangenheit noch nie ausgeführt wurde, kann die neue Pipeline nicht gestartet werden. Verwenden Sie den Parameter bigtableChangeStreamName, um die Pipelinelinie anzugeben.

Wenn dieser Wert auf „false“ gesetzt ist, wird eine neue Pipeline gestartet. Wenn eine Pipeline mit demselben Namen wie bigtableChangeStreamName bereits in der Vergangenheit für die angegebene Quelle ausgeführt wurde, kann die neue Pipeline nicht starten kann.

Die Standardeinstellung ist "false".

bigtableReadProjectId Optional: Projekt, aus dem Bigtable-Daten gelesen werden sollen. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bigtable Change Streams to Vector Search templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud-CLI

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • EMBEDDING_COLUMN: die Einbettungsspalte
  • EMBEDDING_BYTE_SIZE: die Bytegröße des Einbettungsarrays. Kann 4 oder 8 sein.
  • VECTOR_SEARCH_INDEX: der Indexpfad der Vektorsuche
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: die Bigtable-Anwendungsprofil-ID
  • BIGTABLE_READ_INSTANCE_ID: die Bigtable-Quellinstanz-ID
  • BIGTABLE_READ_TABLE_ID: die Bigtable-Quelltabellen-ID.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • EMBEDDING_COLUMN: die Einbettungsspalte
  • EMBEDDING_BYTE_SIZE: die Bytegröße des Einbettungsarrays. Kann 4 oder 8 sein.
  • VECTOR_SEARCH_INDEX: der Indexpfad der Vektorsuche
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: die Bigtable-Anwendungsprofil-ID
  • BIGTABLE_READ_INSTANCE_ID: die Bigtable-Quellinstanz-ID
  • BIGTABLE_READ_TABLE_ID: die Bigtable-Quelltabellen-ID