このテンプレートでは、Bigtable データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Vertex AI ベクトル検索に書き込むストリーミング パイプラインを作成します。
パイプラインの要件
- Bigtable ソース インスタンスが存在している必要があります。
- Bigtable ソーステーブルが存在し、そのテーブルで変更ストリームが有効になっている必要があります。
- Bigtable アプリケーション プロファイルが存在している必要があります。
- ベクトル検索インデックスのパスが存在している必要があります。
テンプレートのパラメータ
パラメータ | 説明 |
---|---|
embeddingColumn |
エンベディングが保存される列の完全修飾名。形式は cf:col。 |
embeddingByteSize |
エンベディング配列内の各エントリのバイトサイズ。浮動小数点数には 4、倍精度には 8 を使用します。デフォルトは 4 です。 |
vectorSearchIndex |
変更がストリーミングされるベクトル検索インデックス。形式は 'projects/{projectID}/locations/{region}/indexes/{indexID}' です(先頭または末尾にスペースを入れないでください)。例: projects/123/locations/us-east1/indexes/456 。 |
bigtableChangeStreamAppProfile |
Bigtable でワークロードを区別するために使用するアプリケーション プロファイル。 |
bigtableReadInstanceId |
テーブルが含まれている Bigtable インスタンスの ID。 |
bigtableReadTableId |
読み取り元の Bigtable テーブル。 |
bigtableMetadataTableTableId |
省略可: 作成されるメタデータ テーブルの ID。設定されていない場合、Bigtable で ID を生成します。 |
crowdingTagColumn |
省略可: クラウディング タグが保存される列の完全修飾名(形式は cf:col )。 |
allowRestrictsMappings |
省略可: allow の制限として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります。 |
denyRestrictsMappings |
省略可: deny の制限として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります。 |
intNumericRestrictsMappings |
省略可: 整数 numeric_restricts として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります。 |
floatNumericRestrictsMappings |
省略可: 浮動小数点数(4 バイト)numeric_restricts として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります |
doubleNumericRestrictsMappings |
省略可: 倍精度(8 バイト)numeric_restricts として使用する列の完全修飾名とそれらのエイリアスのカンマ区切りリスト。それぞれの列名は cf:col->alias の形式にする必要があります |
upsertMaxBatchSize |
省略可: バッチをベクトル検索インデックスに upsert する前にバッファに格納する upsert の最大数。バッチは、いずれかの upsertBatchSize レコードの準備が整ったときに送信されます。例: 10 |
upsertMaxBufferDuration |
省略可: upsert のバッチがベクトル検索に送信されるまでの最大遅延。バッチは、いずれかの upsertBatchSize レコードの準備が整ったときに送信されます。指定できる形式は、秒が Ns (例: 5s)、分が Nm (例: 12m)、時が Nh (例: 2h)です。デフォルト: 10s 。 |
deleteMaxBatchSize |
省略可: ベクトル検索インデックスからバッチを削除する前にバッファに格納する削除の最大数。バッチは、いずれかの deleteBatchSize レコードの準備が整ったときに送信されます。例: 10 。 |
deleteMaxBufferDuration |
省略可: 削除のバッチがベクトル検索に送信されるまでの最大遅延。バッチは、いずれかの deleteBatchSize レコードの準備が整ったときに送信されます。指定できる形式は、秒が Ns (例: 5s)、分が Nm (例: 12m)、時が Nh (例: 2h)です。デフォルト: 10s 。 |
dlqDirectory |
省略可: 処理できなかった理由とともに、未処理のレコードを保存するパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。デフォルト値はほとんどのシナリオに適しています。 |
bigtableChangeStreamMetadataInstanceId |
省略可: 変更ストリーム コネクタのメタデータ テーブルに使用する Bigtable インスタンス。デフォルトは空です。 |
bigtableChangeStreamMetadataTableTableId |
省略可: 使用する Bigtable 変更ストリーム コネクタのメタデータ テーブルの ID。指定しない場合、パイプライン フロー中に Bigtable 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。デフォルトは空です。 |
bigtableChangeStreamCharset |
省略可: 値と列修飾子を読み取るときの Bigtable 変更ストリームの文字セット名。デフォルトは UTF-8 です。 |
bigtableChangeStreamStartTimestamp |
省略可: 変更ストリームの読み取りに使用される開始日時(この値を含む)(https://tools.ietf.org/html/rfc3339)。例: 2022-05-05T07:59:59Z。デフォルトは、パイプライン開始時のタイムスタンプです。 |
bigtableChangeStreamIgnoreColumnFamilies |
省略可: キャプチャされない列ファミリー名における変更のカンマ区切りのリスト。デフォルトは空です。 |
bigtableChangeStreamIgnoreColumns |
省略可: キャプチャされない列名における変更のカンマ区切りのリスト。デフォルトは空です。 |
bigtableChangeStreamName |
省略可: クライアント パイプラインの一意の名前。このパラメータを使用すると、以前に実行していたパイプラインが停止した時点から処理を再開できます。デフォルトは自動生成された名前です。使用される値については、Dataflow ジョブのログをご覧ください。 |
bigtableChangeStreamResume |
省略可: true に設定すると、同じ名前で以前に実行していたパイプラインが停止した時点から、新しいパイプラインで処理を再開します。その名前のパイプラインが過去に実行されていない場合、新しいパイプラインを開始することはできません。 false に設定すると、新しいパイプラインが開始されます。指定したソースに対して デフォルトは false です。 |
bigtableReadProjectId |
省略可: Bigtable データの読み取り元のプロジェクト。このパラメータのデフォルトは、Dataflow パイプラインが実行されているプロジェクトです。 |
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Bigtable Change Streams to Vector Search template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud CLI
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)EMBEDDING_COLUMN
: エンベディング列EMBEDDING_BYTE_SIZE
: エンベディング配列のバイトサイズ。4 または 8 のいずれかです。VECTOR_SEARCH_INDEX
: ベクトル検索インデックスのパスBIGTABLE_CHANGE_STREAM_APP_PROFILE
: Bigtable アプリケーション プロファイル IDBIGTABLE_READ_INSTANCE_ID
: ソース Bigtable インスタンス IDBIGTABLE_READ_TABLE_ID
: ソース Bigtable テーブル ID
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)EMBEDDING_COLUMN
: エンベディング列EMBEDDING_BYTE_SIZE
: エンベディング配列のバイトサイズ。4 または 8 のいずれかです。VECTOR_SEARCH_INDEX
: ベクトル検索インデックスのパスBIGTABLE_CHANGE_STREAM_APP_PROFILE
: Bigtable アプリケーション プロファイル IDBIGTABLE_READ_INSTANCE_ID
: ソース Bigtable インスタンス IDBIGTABLE_READ_TABLE_ID
: ソース Bigtable テーブル ID