Modèle de flux de modifications Bigtable vers Vector Search

Ce modèle crée un pipeline de traitement en flux continu pour diffuser des enregistrements de modification de données Bigtable et les écrire dans Vertex AI Vector Search à l'aide de l'exécuteur Dataflow V2.

Conditions requises pour ce pipeline

  • L'instance source Bigtable doit exister.
  • La table source Bigtable doit exister et les flux de modifications doivent être activés.
  • Le profil d'application Bigtable doit exister.
  • Le chemin d'accès à l'index Vector Search doit exister.

Paramètres de modèle

Paramètre Description
embeddingColumn Nom complet de la colonne dans laquelle les embeddings sont stockés. Au format cf:col.
embeddingByteSize Taille en octets de chaque entrée du tableau des embeddings. Utilisez 4 pour les valeurs flottantes et 8 pour les valeurs doubles. La valeur par défaut est 4.
vectorSearchIndex Index Vector Search où les modifications seront diffusées, au format "projects/{projectID}/locations/{region}/indexes/{indexID}" (sans espaces de début ni de fin). Par exemple : projects/123/locations/us-east1/indexes/456.
bigtableChangeStreamAppProfile Profil d'application utilisé pour distinguer les charges de travail dans Bigtable.
bigtableReadInstanceId ID de l'instance Bigtable qui contient la table.
bigtableReadTableId Table Bigtable à partir de laquelle lire.
bigtableMetadataTableTableId Facultatif: ID de la table de métadonnées créée. Si cette valeur n'est pas définie, Bigtable génère un ID.
crowdingTagColumn Facultatif : Nom complet de la colonne dans laquelle le tag de regroupement est stocké, au format cf:col.
allowRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme restrictions allow, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
denyRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme restrictions deny, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
intNumericRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme entiers numeric_restricts, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
floatNumericRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme valeurs flottantes (4 octets) numeric_restricts, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
doubleNumericRestrictsMappings Facultatif : Noms de colonnes complets, séparés par une virgule, pour les colonnes à utiliser comme valeurs doubles (8 octets) numeric_restricts, avec leurs alias. Chaque nom de colonne doit être au format cf:col->alias.
upsertMaxBatchSize Facultatif : Nombre maximal d'opérations d'upsert à mettre en mémoire tampon avant d'effectuer l'opération d'upsert sur le lot dans l'index Vector Search. Les lots sont envoyés lorsqu'il y a upsertBatchSize enregistrements prêts. Exemple : 10.
upsertMaxBufferDuration Facultatif : Délai maximal avant l'envoi d'un lot d'opérations d'upsert à Vector Search. Les lots sont envoyés lorsqu'il y a au moins upsertBatchSize enregistrements prêts. Les formats autorisés sont les suivants : Ns pour les secondes (exemple : 5 s), Nm pour les minutes (exemple : 12 min) et Nh pour les heures (exemple : 2 h). Valeur par défaut : 10s.
deleteMaxBatchSize Facultatif : Nombre maximal de suppressions à mettre en mémoire tampon avant de supprimer le lot de l'index Vector Search. Les lots sont envoyés lorsqu'il y a au moins deleteBatchSize enregistrements prêts. Exemple : 10.
deleteMaxBufferDuration Facultatif : Délai maximal avant l'envoi d'un lot de suppressions à Vector Search. Les lots sont envoyés lorsqu'il y a deleteBatchSize enregistrements prêts. Les formats autorisés sont les suivants : Ns pour les secondes (exemple : 5 s), Nm pour les minutes (exemple : 12 min) et Nh pour les heures (exemple : 2 h). Valeur par défaut : 10s.
dlqDirectory Facultatif : Chemin d'accès au fichier de stockage des enregistrements non traités et le motif pour lesquels ils n'ont pas pu être traités. La valeur par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow. La valeur par défaut convient à la plupart des scénarios.
bigtableChangeStreamMetadataInstanceId Facultatif : Instance Bigtable à utiliser pour la table de métadonnées du connecteur de flux de modifications. La valeur par défaut est vide.
bigtableChangeStreamMetadataTableTableId Facultatif : ID de table de métadonnées du connecteur de flux de modifications Bigtable à utiliser. Si aucune valeur n'est fournie, une table des métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant le flux de pipeline. La valeur par défaut est vide.
bigtableChangeStreamCharset Facultatif : Nom du charset de flux de modifications Bigtable lors de la lecture des valeurs et des qualificatifs de colonnes. La valeur par défaut est UTF-8.
bigtableChangeStreamStartTimestamp Facultatif : Date et heure de début (incluses) à utiliser pour lire les flux de modifications (https://tools.ietf.org/html/rfc3339). Par exemple, 2020-01-16T16:56:00.000Z. La valeur par défaut est l'horodatage du démarrage du pipeline.
bigtableChangeStreamIgnoreColumnFamilies Facultatif : Liste des modifications de noms de familles de colonnes, séparées par une virgule, qui ne seront pas enregistrées. La valeur par défaut est vide.
bigtableChangeStreamIgnoreColumns Facultatif : Liste des modifications de noms de colonnes, séparées par une virgule, qui ne seront pas enregistrées. La valeur par défaut est vide.
bigtableChangeStreamName Facultatif : nom unique pour le pipeline client. Ce paramètre vous permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
bigtableChangeStreamResume

Facultatif : Lorsque ce paramètre est défini sur "true", un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec le même nom s'est arrêté. Si un pipeline portant ce nom n'a jamais été exécuté par le passé, le nouveau pipeline ne démarre pas. Utilisez le paramètre bigtableChangeStreamName pour spécifier la ligne du pipeline.

Lorsque ce paramètre est défini sur "false", un nouveau pipeline est démarré. Si un pipeline portant le même nom que bigtableChangeStreamName a déjà été exécuté pour la source donnée, le nouveau pipeline ne démarre pas.

Valeur par défaut : "false".

bigtableReadProjectId Facultatif: Projet à partir duquel lire les données Bigtable. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable Change Streams to Vector Search template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud CLI

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • EMBEDDING_COLUMN : colonne "Embedding".
  • EMBEDDING_BYTE_SIZE: taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.
  • VECTOR_SEARCH_INDEX : chemin d'accès de l'index Vector Search.
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE : ID de profil d'application Bigtable.
  • BIGTABLE_READ_INSTANCE_ID : ID de l'instance Bigtable source.
  • BIGTABLE_READ_TABLE_ID : ID de la table Bigtable source.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • EMBEDDING_COLUMN : colonne "Embedding".
  • EMBEDDING_BYTE_SIZE: taille en octets du tableau des embeddings. Il peut s'agir de 4 ou 8.
  • VECTOR_SEARCH_INDEX : chemin d'accès de l'index Vector Search.
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE : ID de profil d'application Bigtable.
  • BIGTABLE_READ_INSTANCE_ID : ID de l'instance Bigtable source.
  • BIGTABLE_READ_TABLE_ID : ID de la table Bigtable source.