Diese Vorlage erstellt eine Streamingpipeline, um Bigtable-Datenänderungsdatensätze zu streamen und sie mit Dataflow Runner V2 in die Vertex AI Vektorsuche zu schreiben.
Pipelineanforderungen
- Die Bigtable-Quellinstanz muss vorhanden sein.
- Die Bigtable-Quelltabelle muss vorhanden sein und für die Tabelle müssen Änderungsstreams aktiviert sein.
- Das Bigtable-Anwendungsprofil muss vorhanden sein.
- Der Indexpfad für die Vektorsuche muss vorhanden sein.
Vorlagenparameter
Erforderliche Parameter
- embeddingColumn: Der voll qualifizierte Spaltenname, in dem die Einbettungen gespeichert sind. Im Format cf:col.
- embeddingByteSize: Die Bytegröße jedes Eintrags im Einbettungsarray. Verwenden Sie 4 für Float und 8 für Double. Die Standardeinstellung ist 4.
- vectorSearchIndex: Der Vektorsuchindex, in den Änderungen gestreamt werden, im Format „projects/{projectID}/locations/{region}/indexes/{indexID}“ (keine voran- oder nachgestellten Leerzeichen). Beispiel:
projects/123/locations/us-east1/indexes/456
. - bigtableChangeStreamAppProfile: Die Bigtable-Anwendungsprofil-ID. Das Anwendungsprofil muss Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen zulassen.
- bigtableReadInstanceId: Die Bigtable-Quellinstanz-ID.
- bigtableReadTableId: Die Bigtable-Quelltabellen-ID.
Optionale Parameter
- bigtableMetadataTableTableId: Die Tabellen-ID, die zum Erstellen der Metadatentabelle verwendet wird.
- crowdingTagColumn: Der voll qualifizierte Spaltenname, in dem das Crowding-Tag gespeichert ist. Im Format cf:col.
- allowRestrictsMappings: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als
allow
-Einschränkungen verwendet werden sollen, mit ihren Aliassen. Im Format cf:col->alias. - denyRestrictsMappings: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als
deny
-Einschränkungen verwendet werden sollen, mit ihren Aliassen. Im Format cf:col->alias. - intNumericRestrictsMappings: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als Ganzzahl-
numeric_restricts
verwendet werden sollen, mit ihren Aliassen. Im Format cf:col->alias. - floatNumericRestrictsMappings: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als Gleitkommazahl (4 Byte)
numeric_restricts
verwendet werden sollen, mit ihren Aliassen. Im Format cf:col->alias. - doubleNumericRestrictsMappings: Die durch Kommas getrennten, vollständig qualifizierten Spaltennamen der Spalten, die als Double (8 Byte)
numeric_restricts
verwendet werden sollen, mit ihren Aliassen. Im Format cf:col->alias. - upsertMaxBatchSize: Die maximale Anzahl von Upserts zum Zwischenspeichern, bevor der Batch auf den Vektorsuchindex aktualisiert wird. Batches werden gesendet, wenn entweder „upsertBatchSize“-Datensätze bereit sind oder die Wartezeit „upsertBatchDelay“ für einen beliebigen Datensatz abgelaufen ist. Beispiel:
10
. Die Standardeinstellung ist 10. - upsertMaxBufferDuration: Die maximale Verzögerung, bevor ein Batch von Upserts an die Vektorsuche gesendet wird.Batches werden gesendet, wenn entweder „upsertBatchSize“-Datensätze bereit sind oder die Wartezeit „upsertBatchDelay“ für einen beliebigen Datensatz abgelaufen ist. Zulässige Formate sind: Ns (für Sekunden, Beispiel: 5s), Nm (für Minuten, Beispiel: 12m), Nh (für Stunden, Beispiel: 2h). Beispiel:
10s
. Die Standardeinstellung ist 10 Sekunden. - deleteMaxBatchSize: Die maximale Anzahl von Löschvorgängen, die zwischengespeichert werden sollen, bevor der Batch aus dem Vektorsuchindex gelöscht wird. Batches werden gesendet, wenn entweder „deleteBatchSize“-Datensätze bereit sind oder die Wartezeit „deleteBatchDelay“ für einen beliebigen Datensatz abgelaufen ist. Beispiel:
10
. Die Standardeinstellung ist 10. - deleteMaxBufferDuration: Die maximale Verzögerung, bevor ein Batch von Löschvorgängen an die Vektorsuche gesendet wird.Batches werden gesendet, wenn entweder „deleteBatchSize“-Datensätze bereit sind oder die Wartezeit „deleteBatchDelay“ für einen beliebigen Datensatz abgelaufen ist. Zulässige Formate sind: Ns (für Sekunden, Beispiel: 5s), Nm (für Minuten, Beispiel: 12m), Nh (für Stunden, Beispiel: 2h). Beispiel:
10s
. Die Standardeinstellung ist 10 Sekunden. - dlqDirectory: Der Pfad zum Speichern nicht verarbeiteter Datensätze mit einer Begründung für die fehlerhafte Verarbeitung. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. Der Standardwert ist unter den meisten Bedingungen ausreichend.
- bigtableChangeStreamMetadataInstanceId: Die Metadateninstanz-ID des Bigtable-Änderungsstreams. Die Standardeinstellung ist leer.
- bigtableChangeStreamMetadataTableTableId: Die ID der Metadatentabelle des Bigtable-Änderungsstream-Connectors. Wenn nicht angegeben, wird während der Pipelineausführung automatisch eine Metadatentabelle für Bigtable-Änderungsstreams erstellt. Die Standardeinstellung ist leer.
- bigtableChangeStreamCharset: Der Zeichensatzname des Bigtable-Änderungsstreams. Standardmäßig ist dies auf UTF8 eingestellt.
- bigtableChangeStreamStartTimestamp: Der Startzeitstempel (https://tools.ietf.org/html/rfc3339) (einschließlich), der zum Lesen von Änderungsstreams verwendet wird. Beispiel:
2022-05-05T07:59:59Z
Die Standardeinstellung ist der Zeitstempel für die Startzeit der Pipeline. - bigtableChangeStreamIgnoreColumnFamilies: Eine durch Kommas getrennte Liste von Änderungen an den Namen der Spaltenfamilien, die ignoriert werden sollen. Die Standardeinstellung ist leer.
- bigtableChangeStreamIgnoreColumns: Eine durch Kommas getrennte Liste von Änderungen der Spaltennamen, die ignoriert werden sollen. Beispiel: „cf1:col1,cf2:col2“ Die Standardeinstellung ist leer.
- bigtableChangeStreamName: Ein eindeutiger Name für die Client-Pipeline. Mit diesem Parameter wird die Verarbeitung ab dem Punkt fortgesetzt, an dem eine zuvor ausgeführte Pipeline beendet wurde. Standardmäßig wird ein automatisch generierter Name verwendet. Den verwendeten Wert finden Sie in den Dataflow-Joblogs.
- bigtableChangeStreamResume: Wenn dieser Wert auf
true
gesetzt ist, setzt eine neue Pipeline die Verarbeitung ab dem Punkt fort, an dem eine zuvor ausgeführte Pipeline mit demselbenbigtableChangeStreamName
-Wert gestoppt wurde. Wenn die Pipeline mit dem angegebenenbigtableChangeStreamName
-Wert noch nie ausgeführt wurde, wird keine neue Pipeline gestartet. Wenn dieser Wert auffalse
gesetzt ist, wird eine neue Pipeline gestartet. Wenn eine Pipeline mit demselbenbigtableChangeStreamName
-Wert bereits für die angegebene Quelle ausgeführt wurde, wird keine neue Pipeline gestartet. Die Standardeinstellung istfalse
. - bigtableReadChangeStreamTimeoutMs: Das Zeitlimit für Bigtable-ReadChangeStream-Anfragen in Millisekunden.
- bigtableReadProjectId: Die Bigtable-Projekt-ID. Der Standardwert ist das Projekt für den Dataflow-Job.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bigtable Change Streams to Vector Search templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud-CLI
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Ersetzen Sie Folgendes:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
EMBEDDING_COLUMN
: die EinbettungsspalteEMBEDDING_BYTE_SIZE
: die Bytegröße des Einbettungsarrays. Kann 4 oder 8 sein.VECTOR_SEARCH_INDEX
: der Indexpfad der VektorsucheBIGTABLE_CHANGE_STREAM_APP_PROFILE
: die Bigtable-Anwendungsprofil-IDBIGTABLE_READ_INSTANCE_ID
: die Bigtable-Quellinstanz-IDBIGTABLE_READ_TABLE_ID
: die Bigtable-Quelltabellen-ID.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
EMBEDDING_COLUMN
: die EinbettungsspalteEMBEDDING_BYTE_SIZE
: die Bytegröße des Einbettungsarrays. Kann 4 oder 8 sein.VECTOR_SEARCH_INDEX
: der Indexpfad der VektorsucheBIGTABLE_CHANGE_STREAM_APP_PROFILE
: die Bigtable-Anwendungsprofil-IDBIGTABLE_READ_INSTANCE_ID
: die Bigtable-Quellinstanz-IDBIGTABLE_READ_TABLE_ID
: die Bigtable-Quelltabellen-ID