BigQuery 的批量预测

本页面介绍了如何使用 BigQuery 获取批量预测。

1. 准备输入

BigQuery 存储空间输入

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member="serviceAccount:SERVICE_ACCOUNT_ID@PROJECT_ID.iam.gserviceaccount.com" \
        --role="roles/bigquery.user"
  

替换以下值:

*   <var>PROJECT_ID</var>: The project that your service account was
    created in.
*   <var>SERVICE_ACCOUNT_ID</var>: The ID for the service account.
  • request 列是必需的,且必须是有效的 JSON。此 JSON 数据表示您为模型提供的输入。
  • request 列中的内容必须与 GenerateContentRequest 的结构匹配。 + 输入表的列数据类型可以是 request 以外的其他类型。这些列可以具有 BigQuery 数据类型,但以下类型除外:数组、结构体、范围、日期时间和地理位置。这些列在内容生成中会被忽略,但会包含在输出表中。
示例输入 (JSON)
        
{
  "contents": [
    {
      "role": "user",
      "parts": [
        {
          "text": "Give me a recipe for banana bread."
        }
      ]
    }
  ],
  "system_instruction": {
    "parts": [
      {
        "text": "You are a chef."
      }
    ]
  }
}
        
        

2. 提交批量作业

您可以通过 Google Cloud 控制台、Google Gen AI SDK 或 REST API 创建批量作业。

作业和您的表必须位于同一区域。

控制台

  1. 在 Google Cloud 控制台的 Vertex AI 部分中,前往批量推理页面。

    前往“批量推理”

  2. 点击创建

REST

如需创建批量预测作业,请使用 projects.locations.batchPredictionJobs.create 方法。

在使用任何请求数据之前,请先进行以下替换:

  • LOCATION:支持 Gemini 模型的区域。
  • PROJECT_ID:您的项目 ID
  • MODEL_PATH:发布方模型名称,例如 publishers/google/models/gemini-2.0-flash-001;或调优后的端点名称,例如 projects/PROJECT_ID/locations/LOCATION/models/MODEL_ID,其中 MODEL_ID 是调优后的模型的模型 ID。
  • INPUT_URI:批量预测输入所在的 BigQuery 表,例如 bq://myproject.mydataset.input_table。数据集必须与批量预测作业位于同一区域。不支持多区域数据集。
  • OUTPUT_FORMAT:如需输出到 BigQuery 表,请指定 bigquery。如需输出到 Cloud Storage 存储桶,请指定 jsonl
  • DESTINATION:对于 BigQuery,请指定 bigqueryDestination。对于 Cloud Storage,请指定 gcsDestination
  • OUTPUT_URI_FIELD_NAME:对于 BigQuery,请指定 outputUri。对于 Cloud Storage,请指定 outputUriPrefix
  • OUTPUT_URI:对于 BigQuery,请指定表位置,例如 bq://myproject.mydataset.output_result。输出 BigQuery 数据集的区域必须与 Vertex AI 批量预测作业的区域相同。 对于 Cloud Storage,请指定存储桶和目录位置,例如 gs://mybucket/path/to/output

HTTP 方法和网址:

POST https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs

请求 JSON 正文:

{
  "displayName": "my-bigquery-batch-prediction-job",
  "model": "MODEL_PATH",
  "inputConfig": {
    "instancesFormat": "bigquery",
    "bigquerySource":{
      "inputUri" : "INPUT_URI"
    }
  },
  "outputConfig": {
    "predictionsFormat": "OUTPUT_FORMAT",
    "DESTINATION": {
      "OUTPUT_URI_FIELD_NAME": "OUTPUT_URI"
    }
  }
}

如需发送请求,请选择以下方式之一:

curl

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs"

PowerShell

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs" | Select-Object -Expand Content

您应该收到类似以下内容的 JSON 响应。

响应包含批量作业的唯一标识符。 您可以使用 BATCH_JOB_ID 轮询批量作业的状态。如需了解详情,请参阅监控作业状态。 注意:不支持自定义服务账号、实时进度、CMEK 和 VPCSC 报告。

Python

安装

pip install --upgrade google-genai

如需了解详情,请参阅 SDK 参考文档

设置环境变量以将 Gen AI SDK 与 Vertex AI 搭配使用:

# Replace the `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION` values
# with appropriate values for your project.
export GOOGLE_CLOUD_PROJECT=GOOGLE_CLOUD_PROJECT
export GOOGLE_CLOUD_LOCATION=global
export GOOGLE_GENAI_USE_VERTEXAI=True

import time

from google import genai
from google.genai.types import CreateBatchJobConfig, JobState, HttpOptions

client = genai.Client(http_options=HttpOptions(api_version="v1"))

# TODO(developer): Update and un-comment below line
# output_uri = f"bq://your-project.your_dataset.your_table"

job = client.batches.create(
    # To use a tuned model, set the model param to your tuned model using the following format:
    # model="projects/{PROJECT_ID}/locations/{LOCATION}/models/{MODEL_ID}
    model="gemini-2.5-flash",
    src="bq://storage-samples.generative_ai.batch_requests_for_multimodal_input",
    config=CreateBatchJobConfig(dest=output_uri),
)
print(f"Job name: {job.name}")
print(f"Job state: {job.state}")
# Example response:
# Job name: projects/%PROJECT_ID%/locations/us-central1/batchPredictionJobs/9876453210000000000
# Job state: JOB_STATE_PENDING

# See the documentation: https://googleapis.github.io/python-genai/genai.html#genai.types.BatchJob
completed_states = {
    JobState.JOB_STATE_SUCCEEDED,
    JobState.JOB_STATE_FAILED,
    JobState.JOB_STATE_CANCELLED,
    JobState.JOB_STATE_PAUSED,
}

while job.state not in completed_states:
    time.sleep(30)
    job = client.batches.get(name=job.name)
    print(f"Job state: {job.state}")
# Example response:
# Job state: JOB_STATE_PENDING
# Job state: JOB_STATE_RUNNING
# Job state: JOB_STATE_RUNNING
# ...
# Job state: JOB_STATE_SUCCEEDED

3. 监控作业状态和进度

提交作业后,您可以使用 API、SDK 和 Cloud 控制台检查批量作业的状态。

控制台

  1. 前往批量推理页面。

    前往“批量推理”

  2. 选择您的批量作业以监控进度。

REST

如需监控批量预测作业,请使用 projects.locations.batchPredictionJobs.get 方法,并查看响应中的 CompletionStats 字段。

在使用任何请求数据之前,请先进行以下替换:

  • LOCATION:支持 Gemini 模型的区域。
  • PROJECT_ID:。
  • BATCH_JOB_ID:您的批量作业 ID。

HTTP 方法和网址:

GET https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs/BATCH_JOB_ID

如需发送请求,请选择以下方式之一:

curl

执行以下命令:

curl -X GET \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs/BATCH_JOB_ID"

PowerShell

执行以下命令:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method GET `
-Headers $headers `
-Uri "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs/BATCH_JOB_ID" | Select-Object -Expand Content

您应该收到类似以下内容的 JSON 响应。

Python

安装

pip install --upgrade google-genai

如需了解详情,请参阅 SDK 参考文档

设置环境变量以将 Gen AI SDK 与 Vertex AI 搭配使用:

# Replace the `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION` values
# with appropriate values for your project.
export GOOGLE_CLOUD_PROJECT=GOOGLE_CLOUD_PROJECT
export GOOGLE_CLOUD_LOCATION=global
export GOOGLE_GENAI_USE_VERTEXAI=True

import time

from google import genai
from google.genai.types import CreateBatchJobConfig, JobState, HttpOptions

client = genai.Client(http_options=HttpOptions(api_version="v1"))

# TODO(developer): Update and un-comment below line
# output_uri = f"bq://your-project.your_dataset.your_table"

job = client.batches.create(
    # To use a tuned model, set the model param to your tuned model using the following format:
    # model="projects/{PROJECT_ID}/locations/{LOCATION}/models/{MODEL_ID}
    model="gemini-2.5-flash",
    src="bq://storage-samples.generative_ai.batch_requests_for_multimodal_input",
    config=CreateBatchJobConfig(dest=output_uri),
)
print(f"Job name: {job.name}")
print(f"Job state: {job.state}")
# Example response:
# Job name: projects/%PROJECT_ID%/locations/us-central1/batchPredictionJobs/9876453210000000000
# Job state: JOB_STATE_PENDING

# See the documentation: https://googleapis.github.io/python-genai/genai.html#genai.types.BatchJob
completed_states = {
    JobState.JOB_STATE_SUCCEEDED,
    JobState.JOB_STATE_FAILED,
    JobState.JOB_STATE_CANCELLED,
    JobState.JOB_STATE_PAUSED,
}

while job.state not in completed_states:
    time.sleep(30)
    job = client.batches.get(name=job.name)
    print(f"Job state: {job.state}")
# Example response:
# Job state: JOB_STATE_PENDING
# Job state: JOB_STATE_RUNNING
# Job state: JOB_STATE_RUNNING
# ...
# Job state: JOB_STATE_SUCCEEDED

给定批量作业的状态可以是以下任何一种:

  • JOB_STATE_PENDING:容量的队列。作业在进入 running 状态之前,最多可以处于 queue 状态 72 小时。
  • JOB_STATE_RUNNING:输入文件已成功验证,目前正在运行批量作业。
  • JOB_STATE_SUCCEEDED:批量作业已完成,结果已准备就绪
  • JOB_STATE_FAILED:输入文件未通过验证流程,或者无法在进入 RUNNING 状态后的 24 小时内完成验证。
  • JOB_STATE_CANCELLING:正在取消批量作业
  • JOB_STATE_CANCELLED:已取消批量作业

4. 检索批量输出

批量预测任务完成后,输出存储在您在请求中指定的 BigQuery 表中。

对于成功的行,模型回答会存储在 response 列中。 否则,错误详情会存储在 status 列中,以进一步检查。

输出示例

成功示例

{
  "candidates": [
    {
      "content": {
        "role": "model",
        "parts": [
          {
            "text": "In a medium bowl, whisk together the flour, baking soda, baking powder."
          }
        ]
      },
      "finishReason": "STOP",
      "safetyRatings": [
        {
          "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
          "probability": "NEGLIGIBLE",
          "probabilityScore": 0.14057204,
          "severity": "HARM_SEVERITY_NEGLIGIBLE",
          "severityScore": 0.14270912
        }
      ]
    }
  ],
  "usageMetadata": {
    "promptTokenCount": 8,
    "candidatesTokenCount": 396,
    "totalTokenCount": 404
  }
}

失败示例

  • 请求

    {"contents":[{"parts":{"text":"Explain how AI works in a few words."},"role":"tester"}]}
    
  • 响应

    Bad Request: {"error": {"code": 400, "message": "Please use a valid role: user, model.", "status": "INVALID_ARGUMENT"}}