Modèle de flux de modifications Bigtable vers Vector Search

Ce modèle crée un pipeline de traitement en flux continu pour diffuser des enregistrements de modification de données Bigtable et les écrire dans Vertex AI Vector Search à l'aide de l'exécuteur Dataflow V2.

Conditions requises pour ce pipeline

  • L'instance source Bigtable doit exister.
  • La table source Bigtable doit exister et les flux de modifications doivent être activés dans la table.
  • Le profil d'application Bigtable doit exister.
  • Le chemin d'accès à l'index Vector Search doit exister.

Paramètres de modèle

Paramètres obligatoires

  • embeddingColumn : nom complet de la colonne dans laquelle les embeddings sont stockés. Au format cf:col.
  • embeddingByteSize : taille en octets de chaque entrée du tableau des embeddings. Utilisez 4 pour Float et 8 pour Double. La valeur par défaut est 4.
  • vectorSearchIndex : index Vector Search où les modifications seront diffusées, au format "projects/{projectID}/locations/{region}/indexes/{indexID}" (sans espaces de début ni de fin). Par exemple, projects/123/locations/us-east1/indexes/456.
  • bigtableChangeStreamAppProfile : ID du profil d'application Bigtable. Le profil d'application doit utiliser un routage à cluster unique et autoriser les transactions à ligne unique.
  • bigtableReadInstanceId : ID de l'instance Bigtable source.
  • bigtableReadTableId : ID de la table Bigtable source.

Paramètres facultatifs

  • bigtableMetadataTableTableId : ID de table utilisé pour créer la table de métadonnées.
  • crowdingTagColumn : nom complet de la colonne dans laquelle le tag de regroupement est stocké. Au format cf:col.
  • allowRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme restrictions allow, avec leurs alias. Au format cf:col->alias.
  • denyRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme restrictions deny, avec leurs alias. Au format cf:col->alias.
  • intNumericRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme valeurs d'entier numeric_restricts, avec leurs alias. Au format cf:col->alias.
  • floatNumericRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme valeurs flottantes (4 octets) numeric_restricts, avec leurs alias. Au format cf:col->alias.
  • doubleNumericRestrictsMappings : noms de colonnes complets, séparés par des virgules, pour les colonnes à utiliser comme valeurs numeric_restricts (8 octets), avec leurs alias. Au format cf:col->alias.
  • upsertMaxBatchSize : nombre maximal d'opérations d'upsert à mettre en mémoire tampon avant d'effectuer l'opération d'upsert sur le lot dans l'index Vector Search. Les lots sont envoyés lorsque le nombre d'enregistrements upsertBatchSize est atteint ou lorsque le délai upsertBatchDelay est écoulé pour un enregistrement. Exemple :10 La valeur par défaut est 10.
  • upsertMaxBufferDuration : délai maximal avant l'envoi d'un lot d'opérations d'upsert à Vector Search.Les lots sont envoyés lorsque upsertBatchSize enregistrements sont prêts ou lorsqu'un enregistrement a attendu upsertBatchDelay. Les formats autorisés sont les suivants : Ns (pour les secondes, exemple : 5s), Nm (pour les minutes, exemple : 12m), Nh (pour les heures, exemple : 2h). Exemple :10s La valeur par défaut est de 10 secondes.
  • deleteMaxBatchSize : nombre maximal de suppressions à mettre en mémoire tampon avant de supprimer le lot de l'index Vector Search. Les lots sont envoyés lorsque deleteBatchSize enregistrements sont prêts ou lorsqu'un enregistrement attend depuis deleteBatchDelay. Exemple :10 La valeur par défaut est 10.
  • deleteMaxBufferDuration : délai maximal avant l'envoi d'un lot de suppressions à Vector Search.Les lots sont envoyés lorsque deleteBatchSize enregistrements sont prêts ou lorsqu'un enregistrement a attendu le délai deleteBatchDelay. Les formats autorisés sont les suivants : Ns (pour les secondes, exemple : 5s), Nm (pour les minutes, exemple : 12m), Nh (pour les heures, exemple : 2h). Exemple :10s La valeur par défaut est de 10 secondes.
  • dlqDirectory : chemin d'accès au fichier de stockage des enregistrements non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire de la tâche Dataflow. La valeur par défaut est suffisante dans la plupart des conditions.
  • bigtableChangeStreamMetadataInstanceId : ID d'instance de métadonnées du flux de modifications Bigtable. La valeur par défaut est vide.
  • bigtableChangeStreamMetadataTableTableId : ID de la table de métadonnées du connecteur de flux de modifications Bigtable. Si aucune valeur n'est fournie, une table de métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant l'exécution du pipeline. La valeur par défaut est vide.
  • bigtableChangeStreamCharset : nom du charset de flux de modifications Bigtable. La valeur par défaut est UTF8.
  • bigtableChangeStreamStartTimestamp : code temporel de début (https://tools.ietf.org/html/rfc3339), inclusif, à utiliser pour la lecture des flux de modifications. Par exemple, 2022-05-05T07:59:59Z. La valeur par défaut est l'horodatage de l'heure de début du pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies : liste des modifications de noms de familles de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
  • bigtableChangeStreamIgnoreColumns : liste des modifications de noms de colonnes, séparées par une virgule, à ignorer. Exemple : "cf1:col1,cf2:col2". La valeur par défaut est vide.
  • bigtableChangeStreamName : nom unique pour le pipeline client. Permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
  • bigtableChangeStreamResume : lorsque ce paramètre est défini sur true, un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec la même valeur bigtableChangeStreamName s'est arrêté. Si le pipeline avec la valeur bigtableChangeStreamName donnée n'a jamais été exécuté, aucun nouveau pipeline ne démarre. Lorsque ce paramètre est défini sur false, un nouveau pipeline démarre. Si un pipeline avec la même valeur bigtableChangeStreamName a déjà été exécuté pour la source donnée, aucun nouveau pipeline ne démarre. La valeur par défaut est false.
  • bigtableReadChangeStreamTimeoutMs : délai avant expiration (en millisecondes) des requêtes Bigtable ReadChangeStream.
  • bigtableReadProjectId : ID du projet Bigtable. La valeur par défaut est le projet pour le job Dataflow.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable Change Streams to Vector Search template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud CLI

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • EMBEDDING_COLUMN : colonne "Embedding".
  • EMBEDDING_BYTE_SIZE : taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.
  • VECTOR_SEARCH_INDEX : chemin d'accès de l'index Vector Search.
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE : ID de profil d'application Bigtable.
  • BIGTABLE_READ_INSTANCE_ID : ID de l'instance Bigtable source.
  • BIGTABLE_READ_TABLE_ID : ID de la table Bigtable source.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • EMBEDDING_COLUMN : colonne "Embedding".
  • EMBEDDING_BYTE_SIZE : taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.
  • VECTOR_SEARCH_INDEX : chemin d'accès de l'index Vector Search.
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE : ID de profil d'application Bigtable.
  • BIGTABLE_READ_INSTANCE_ID : ID de l'instance Bigtable source.
  • BIGTABLE_READ_TABLE_ID : ID de la table Bigtable source.