Ce modèle crée un pipeline de traitement en flux continu pour diffuser des enregistrements de modification de données Bigtable et les écrire dans Vertex AI Vector Search à l'aide de l'exécuteur Dataflow V2.
Conditions requises pour ce pipeline
- L'instance source Bigtable doit exister.
- La table source Bigtable doit exister et les flux de modifications doivent être activés.
- Le profil d'application Bigtable doit exister.
- Le chemin d'accès à l'index Vector Search doit exister.
Paramètres de modèle
Paramètre | Description |
---|---|
embeddingColumn |
Nom complet de la colonne dans laquelle les embeddings sont stockés. Au format cf:col. |
embeddingByteSize |
Taille en octets de chaque entrée du tableau des embeddings. Utilisez 4 pour les valeurs flottantes et 8 pour les valeurs doubles. La valeur par défaut est 4 . |
vectorSearchIndex |
Index Vector Search où les modifications seront diffusées, au format "projects/{projectID}/locations/{region}/indexes/{indexID}" (sans espaces de début ni de fin). Par exemple : projects/123/locations/us-east1/indexes/456 . |
bigtableChangeStreamAppProfile |
Profil d'application utilisé pour distinguer les charges de travail dans Bigtable. |
bigtableReadInstanceId |
ID de l'instance Bigtable qui contient la table. |
bigtableReadTableId |
Table Bigtable à partir de laquelle lire. |
bigtableMetadataTableTableId |
Facultatif: ID de la table de métadonnées créée. Si cette valeur n'est pas définie, Bigtable génère un ID. |
crowdingTagColumn |
Facultatif : Nom complet de la colonne dans laquelle le tag de regroupement est stocké, au format cf:col . |
allowRestrictsMappings |
Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme restrictions allow , avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias . |
denyRestrictsMappings |
Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme restrictions deny , avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias . |
intNumericRestrictsMappings |
Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme entiers numeric_restricts , avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias . |
floatNumericRestrictsMappings |
Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme valeurs flottantes (4 octets) numeric_restricts , avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias . |
doubleNumericRestrictsMappings |
Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme valeurs doubles (8 octets) numeric_restricts , avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias . |
upsertMaxBatchSize |
Facultatif : Nombre maximal d'opérations d'upsert à mettre en mémoire tampon avant d'effectuer l'opération d'upsert sur le lot dans l'index Vector Search. Les lots sont envoyés lorsqu'il y a upsertBatchSize enregistrements prêts.
Exemple : 10 . |
upsertMaxBufferDuration |
Facultatif : Délai maximal avant l'envoi d'un lot d'opérations d'upsert à Vector Search. Les lots sont envoyés lorsqu'il y a au moins upsertBatchSize enregistrements prêts. Les formats autorisés sont les suivants : Ns pour les secondes (exemple : 5 s), Nm pour les minutes (exemple : 12 min) et Nh pour les heures (exemple : 2 h). Valeur par défaut : 10s . |
deleteMaxBatchSize |
Facultatif : Nombre maximal de suppressions à mettre en mémoire tampon avant de supprimer le lot de l'index Vector Search.
Les lots sont envoyés lorsqu'il y a au moins deleteBatchSize enregistrements prêts.
Exemple : 10 . |
deleteMaxBufferDuration |
Facultatif : Délai maximal avant l'envoi d'un lot de suppressions à Vector Search. Les lots sont envoyés lorsqu'il y a deleteBatchSize enregistrements prêts. Les formats autorisés sont les suivants : Ns pour les secondes (exemple : 5 s), Nm pour les minutes (exemple : 12 min) et Nh pour les heures (exemple : 2 h). Valeur par défaut : 10s . |
dlqDirectory |
Facultatif : Chemin d'accès au fichier de stockage des enregistrements non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow. La valeur par défaut convient à la plupart des scénarios. |
bigtableChangeStreamMetadataInstanceId |
Facultatif : Instance Bigtable à utiliser pour la table de métadonnées du connecteur de flux de modifications. La valeur par défaut est vide. |
bigtableChangeStreamMetadataTableTableId |
Facultatif : ID de table de métadonnées du connecteur de flux de modifications Bigtable à utiliser. Si aucune valeur n'est fournie, une table des métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant le flux de pipeline. La valeur par défaut est vide. |
bigtableChangeStreamCharset |
Facultatif : Nom du charset de flux de modifications Bigtable lors de la lecture des valeurs et des qualificatifs de colonnes. La valeur par défaut est UTF-8. |
bigtableChangeStreamStartTimestamp |
Facultatif : Date et heure de début (incluses) à utiliser pour lire les flux de modifications (https://tools.ietf.org/html/rfc3339). Par exemple, 2020-01-16T16:56:00.000Z. La valeur par défaut est l'horodatage du démarrage du pipeline. |
bigtableChangeStreamIgnoreColumnFamilies |
Facultatif : Liste des modifications de noms de familles de colonnes, séparées par une virgule, qui ne seront pas enregistrées. La valeur par défaut est vide. |
bigtableChangeStreamIgnoreColumns |
Facultatif : Liste des modifications de noms de colonnes, séparées par une virgule, qui ne seront pas enregistrées. La valeur par défaut est vide. |
bigtableChangeStreamName |
Facultatif : nom unique pour le pipeline client. Ce paramètre vous permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée. |
bigtableChangeStreamResume |
Facultatif : Lorsque ce paramètre est défini sur "true", un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec le même nom s'est arrêté. Si un pipeline portant ce nom n'a jamais été exécuté par le passé, le nouveau pipeline ne démarre pas.
Utilisez le paramètre Lorsque ce paramètre est défini sur "false", un nouveau pipeline est démarré. Si un pipeline portant le même nom que Valeur par défaut : "false". |
bigtableReadProjectId |
Facultatif: Projet à partir duquel lire les données Bigtable. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté. |
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable Change Streams to Vector Search template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud CLI
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
REGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
EMBEDDING_COLUMN
: colonne "Embedding".EMBEDDING_BYTE_SIZE
: taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.VECTOR_SEARCH_INDEX
: chemin d'accès de l'index Vector Search.BIGTABLE_CHANGE_STREAM_APP_PROFILE
: ID de profil d'application Bigtable.BIGTABLE_READ_INSTANCE_ID
: ID de l'instance Bigtable source.BIGTABLE_READ_TABLE_ID
: ID de la table Bigtable source.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
LOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
EMBEDDING_COLUMN
: colonne "Embedding".EMBEDDING_BYTE_SIZE
: taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.VECTOR_SEARCH_INDEX
: chemin d'accès de l'index Vector Search.BIGTABLE_CHANGE_STREAM_APP_PROFILE
: ID de profil d'application Bigtable.BIGTABLE_READ_INSTANCE_ID
: ID de l'instance Bigtable source.BIGTABLE_READ_TABLE_ID
: ID de la table Bigtable source.