Ce modèle crée un pipeline de traitement en flux continu pour diffuser des enregistrements de modification de données Bigtable et les écrire dans Vertex AI Vector Search à l'aide de l'exécuteur Dataflow V2.
Conditions requises pour ce pipeline
- L'instance source Bigtable doit exister.
- La table source Bigtable doit exister et les flux de modifications doivent être activés dans la table.
- Le profil d'application Bigtable doit exister.
- Le chemin d'accès à l'index Vector Search doit exister.
Paramètres de modèle
Paramètres obligatoires
- embeddingColumn : nom complet de la colonne dans laquelle les embeddings sont stockés. Au format cf:col.
- embeddingByteSize : taille en octets de chaque entrée du tableau des embeddings. Utilisez 4 pour Float et 8 pour Double. La valeur par défaut est 4.
- vectorSearchIndex : index Vector Search où les modifications seront diffusées, au format "projects/{projectID}/locations/{region}/indexes/{indexID}" (sans espaces de début ni de fin). Par exemple,
projects/123/locations/us-east1/indexes/456
. - bigtableChangeStreamAppProfile : ID du profil d'application Bigtable. Le profil d'application doit utiliser un routage à cluster unique et autoriser les transactions à ligne unique.
- bigtableReadInstanceId : ID de l'instance Bigtable source.
- bigtableReadTableId : ID de la table Bigtable source.
Paramètres facultatifs
- bigtableMetadataTableTableId : ID de table utilisé pour créer la table de métadonnées.
- crowdingTagColumn : nom complet de la colonne dans laquelle le tag de regroupement est stocké. Au format cf:col.
- allowRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme restrictions
allow
, avec leurs alias. Au format cf:col->alias. - denyRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme restrictions
deny
, avec leurs alias. Au format cf:col->alias. - intNumericRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme valeurs d'entier
numeric_restricts
, avec leurs alias. Au format cf:col->alias. - floatNumericRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme valeurs flottantes (4 octets)
numeric_restricts
, avec leurs alias. Au format cf:col->alias. - doubleNumericRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme valeurs
numeric_restricts
(8 octets), avec leurs alias. Au format cf:col->alias. - upsertMaxBatchSize : nombre maximal d'opérations d'upsert à mettre en mémoire tampon avant d'effectuer l'opération d'upsert sur le lot dans l'index Vector Search. Les lots sont envoyés lorsque le nombre d'enregistrements upsertBatchSize est atteint ou lorsque le délai upsertBatchDelay est écoulé pour un enregistrement. Exemple :
10
La valeur par défaut est 10. - upsertMaxBufferDuration : délai maximal avant l'envoi d'un lot d'opérations d'upsert à Vector Search.Les lots sont envoyés lorsque upsertBatchSize enregistrements sont prêts ou lorsqu'un enregistrement a attendu upsertBatchDelay. Les formats autorisés sont les suivants : Ns (pour les secondes, exemple : 5s), Nm (pour les minutes, exemple : 12m), Nh (pour les heures, exemple : 2h). Exemple :
10s
La valeur par défaut est de 10 secondes. - deleteMaxBatchSize : nombre maximal de suppressions à mettre en mémoire tampon avant de supprimer le lot de l'index Vector Search. Les lots sont envoyés lorsque deleteBatchSize enregistrements sont prêts ou lorsqu'un enregistrement attend depuis deleteBatchDelay. Exemple :
10
La valeur par défaut est 10. - deleteMaxBufferDuration : délai maximal avant l'envoi d'un lot de suppressions à Vector Search.Les lots sont envoyés lorsque deleteBatchSize enregistrements sont prêts ou lorsqu'un enregistrement a attendu le délai deleteBatchDelay. Les formats autorisés sont les suivants : Ns (pour les secondes, exemple : 5s), Nm (pour les minutes, exemple : 12m), Nh (pour les heures, exemple : 2h). Exemple :
10s
La valeur par défaut est de 10 secondes. - dlqDirectory : chemin d'accès au fichier de stockage des enregistrements non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. La valeur par défaut est suffisante dans la plupart des conditions.
- bigtableChangeStreamMetadataInstanceId : ID d'instance de métadonnées du flux de modifications Bigtable. La valeur par défaut est vide.
- bigtableChangeStreamMetadataTableTableId : ID de la table de métadonnées du connecteur de flux de modifications Bigtable. Si aucune valeur n'est fournie, une table de métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant l'exécution du pipeline. La valeur par défaut est vide.
- bigtableChangeStreamCharset : nom du charset de flux de modifications Bigtable. La valeur par défaut est UTF8.
- bigtableChangeStreamStartTimestamp : code temporel de début (https://tools.ietf.org/html/rfc3339), inclusif, à utiliser pour la lecture des flux de modifications. Par exemple,
2022-05-05T07:59:59Z
. La valeur par défaut est l'horodatage de l'heure de début du pipeline. - bigtableChangeStreamIgnoreColumnFamilies : liste des modifications de noms de familles de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
- bigtableChangeStreamIgnoreColumns : liste des modifications de noms de colonnes, séparées par une virgule, à ignorer. Exemple : "cf1:col1,cf2:col2". La valeur par défaut est vide.
- bigtableChangeStreamName : nom unique pour le pipeline client. Permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
- bigtableChangeStreamResume : lorsque ce paramètre est défini sur
true
, un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec la même valeurbigtableChangeStreamName
s'est arrêté. Si le pipeline avec la valeurbigtableChangeStreamName
donnée n'a jamais été exécuté, aucun nouveau pipeline ne démarre. Lorsque ce paramètre est défini surfalse
, un nouveau pipeline démarre. Si un pipeline avec la même valeurbigtableChangeStreamName
a déjà été exécuté pour la source donnée, aucun nouveau pipeline ne démarre. La valeur par défaut estfalse
. - bigtableReadChangeStreamTimeoutMs : délai avant expiration (en millisecondes) des requêtes Bigtable ReadChangeStream.
- bigtableReadProjectId : ID du projet Bigtable. La valeur par défaut est le projet pour le job Dataflow.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable Change Streams to Vector Search template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud CLI
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
REGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
EMBEDDING_COLUMN
: colonne "Embedding".EMBEDDING_BYTE_SIZE
: taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.VECTOR_SEARCH_INDEX
: chemin d'accès de l'index Vector Search.BIGTABLE_CHANGE_STREAM_APP_PROFILE
: ID de profil d'application Bigtable.BIGTABLE_READ_INSTANCE_ID
: ID de l'instance Bigtable source.BIGTABLE_READ_TABLE_ID
: ID de la table Bigtable source.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
LOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
EMBEDDING_COLUMN
: colonne "Embedding".EMBEDDING_BYTE_SIZE
: taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.VECTOR_SEARCH_INDEX
: chemin d'accès de l'index Vector Search.BIGTABLE_CHANGE_STREAM_APP_PROFILE
: ID de profil d'application Bigtable.BIGTABLE_READ_INSTANCE_ID
: ID de l'instance Bigtable source.BIGTABLE_READ_TABLE_ID
: ID de la table Bigtable source.