批量预测

批量预测是一种有价值的技术,可用于高效地将机器学习模型应用于大型数据集。您可以将一批数据提交给 Gemini 进行预测,而不是处理单个数据点,从而节省时间和计算资源。与在线预测(一次只能有一个输入提示)不同,您可以在单个批量请求中发送大量多模态提示。然后,您的回答会异步填充到 BigQuery 或 Cloud Storage 存储空间输出位置。

Gemini 模型的批量请求比标准请求享受 50% 的折扣。如需了解详情,请参阅价格页面

批量预测用例

假设某个在线书店的数据库中有数千本图书。该商店可以使用 Gemini 批量预测功能一次处理所有图书信息,而无需为每本图书单独生成说明,这将大大节省时间。这种方法通过缩短总体处理时间并最大限度地减少所需的计算资源,显著提高了效率。

批量预测还可以提高自动化操作的一致性。通过同时处理所有说明,该模型可确保所有图书说明保持一致的基调和风格,从而强化品牌形象。该书店还可以将批量预测集成到其工作流中,以自动为新图书条目生成说明,从而免去手动操作,并确保其网站始终保持最新状态,同时尽可能减少人工干预。

支持批量预测的多模态模型

以下多模态模型支持批量预测。

  • gemini-1.5-flash-002
  • gemini-1.5-flash-001
  • gemini-1.5-pro-002
  • gemini-1.5-pro-001
  • gemini-1.0-pro-002
  • gemini-1.0-pro-001

多模态模型的批量请求接受 BigQuery 存储源和 Cloud Storage 源。您可以独立选择将预测结果输出到 BigQuery 表或 Cloud Storage 存储桶中的 JSONL 文件。

针对 Cloud Storage 的批量预测

准备输入

Cloud Storage 输入

  • 文件格式:JSON 行 (JSONL)
  • 位于 us-central1
  • 服务账号的适当读取权限
  • fileData 支持仅限于某些 Gemini 型号。
    示例输入(JSONL)
    
    {"request":{"contents": [{"role": "user", "parts": [{"text": "What is the relation between the following video and image samples?"}, {"fileData": {"fileUri": "gs://cloud-samples-data/generative-ai/video/animals.mp4", "mimeType": "video/mp4"}}, {"fileData": {"fileUri": "gs://cloud-samples-data/generative-ai/image/cricket.jpeg", "mimeType": "image/jpeg"}}]}]}}
    {"request":{"contents": [{"role": "user", "parts": [{"text": "Describe what is happening in this video."}, {"fileData": {"fileUri": "gs://cloud-samples-data/generative-ai/video/another_video.mov", "mimeType": "video/mov"}}]}]}}
        

请求批量预测作业

指定 Cloud Storage 输入表、模型和输出位置。

REST

如需创建批量预测作业,请使用 projects.locations.batchPredictionJobs.create 方法。

在使用任何请求数据之前,请先进行以下替换:

  • LOCATION:支持 Gemini 模型的区域。
  • PROJECT_ID:您的项目 ID
  • INPUT_URI:JSONL 批量预测输入(例如 gs://bucketname/path/to/file.jsonl)的 Cloud Storage 位置。
  • OUTPUT_FORMAT:如需输出到 BigQuery 表,请指定 bigquery。如需输出到 Cloud Storage 存储桶,请指定 jsonl
  • DESTINATION:对于 BigQuery,请指定 bigqueryDestination。对于 Cloud Storage,请指定 gcsDestination
  • OUTPUT_URI_FIELD_NAME:对于 BigQuery,请指定 outputUri。对于 Cloud Storage,请指定 outputUriPrefix
  • OUTPUT_URI:对于 BigQuery,请指定表位置,例如 bq://myproject.mydataset.output_result。输出 BigQuery 数据集的区域必须与 Vertex AI 批量预测作业的区域相同。 对于 Cloud Storage,请指定存储桶和目录位置,例如 gs://mybucket/path/to/output

HTTP 方法和网址:

POST https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs

请求 JSON 正文:

{
  "displayName": "my-cloud-storage-batch-prediction-job",
  "model": "publishers/google/models/gemini-1.5-flash-002",
  "inputConfig": {
    "instancesFormat": "jsonl",
    "gcsSource": {
      "uris" : "INPUT_URI"
    }
  },
  "outputConfig": {
    "predictionsFormat": "OUTPUT_FORMAT",
    "DESTINATION": {
      "OUTPUT_URI_FIELD_NAME": "OUTPUT_URI"
    }
  }
}

如需发送请求,请选择以下方式之一:

curl

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs"

PowerShell

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs" | Select-Object -Expand Content

您应该收到类似以下内容的 JSON 响应。

响应包含批量作业的唯一标识符。您可以使用 BATCH_JOB_ID 轮询批量作业的状态,直到作业 stateJOB_STATE_SUCCEEDED。 例如:

curl \
  -X GET \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
https://us-central1-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/us-central1/batchPredictionJobs/BATCH_JOB_ID

Python

如需了解如何安装或更新 Vertex AI SDK for Python,请参阅安装 Vertex AI SDK for Python。 如需了解详情,请参阅 Python API 参考文档

import time
import vertexai

from vertexai.batch_prediction import BatchPredictionJob

# TODO(developer): Update and un-comment below line
# PROJECT_ID = "your-project-id"

# Initialize vertexai
vertexai.init(project=PROJECT_ID, location="us-central1")

input_uri = "gs://cloud-samples-data/batch/prompt_for_batch_gemini_predict.jsonl"

# Submit a batch prediction job with Gemini model
batch_prediction_job = BatchPredictionJob.submit(
    source_model="gemini-1.5-flash-002",
    input_dataset=input_uri,
    output_uri_prefix=output_uri,
)

# Check job status
print(f"Job resource name: {batch_prediction_job.resource_name}")
print(f"Model resource name with the job: {batch_prediction_job.model_name}")
print(f"Job state: {batch_prediction_job.state.name}")

# Refresh the job until complete
while not batch_prediction_job.has_ended:
    time.sleep(5)
    batch_prediction_job.refresh()

# Check if the job succeeds
if batch_prediction_job.has_succeeded:
    print("Job succeeded!")
else:
    print(f"Job failed: {batch_prediction_job.error}")

# Check the location of the output
print(f"Job output location: {batch_prediction_job.output_location}")

# Example response:
#  Job output location: gs://your-bucket/gen-ai-batch-prediction/prediction-model-year-month-day-hour:minute:second.12345

Node.js

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Node.js API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

// Import the aiplatform library
const aiplatformLib = require('@google-cloud/aiplatform');
const aiplatform = aiplatformLib.protos.google.cloud.aiplatform.v1;

/**
 * TODO(developer):  Uncomment/update these variables before running the sample.
 */
// projectId = 'YOUR_PROJECT_ID';
// URI of the output folder in Google Cloud Storage.
// E.g. "gs://[BUCKET]/[OUTPUT]"
// outputUri = 'gs://my-bucket';

// URI of the input file in Google Cloud Storage.
// E.g. "gs://[BUCKET]/[DATASET].jsonl"
// Or try:
// "gs://cloud-samples-data/generative-ai/batch/gemini_multimodal_batch_predict.jsonl"
// for a batch prediction that uses audio, video, and an image.
const inputUri =
  'gs://cloud-samples-data/generative-ai/batch/batch_requests_for_multimodal_input.jsonl';
const location = 'us-central1';
const parent = `projects/${projectId}/locations/${location}`;
const modelName = `${parent}/publishers/google/models/gemini-1.5-flash-002`;

// Specify the location of the api endpoint.
const clientOptions = {
  apiEndpoint: `${location}-aiplatform.googleapis.com`,
};

// Instantiate the client.
const jobServiceClient = new aiplatformLib.JobServiceClient(clientOptions);

// Create a Gemini batch prediction job using Google Cloud Storage input and output buckets.
async function create_batch_prediction_gemini_gcs() {
  const gcsSource = new aiplatform.GcsSource({
    uris: [inputUri],
  });

  const inputConfig = new aiplatform.BatchPredictionJob.InputConfig({
    gcsSource: gcsSource,
    instancesFormat: 'jsonl',
  });

  const gcsDestination = new aiplatform.GcsDestination({
    outputUriPrefix: outputUri,
  });

  const outputConfig = new aiplatform.BatchPredictionJob.OutputConfig({
    gcsDestination: gcsDestination,
    predictionsFormat: 'jsonl',
  });

  const batchPredictionJob = new aiplatform.BatchPredictionJob({
    displayName: 'Batch predict with Gemini - GCS',
    model: modelName,
    inputConfig: inputConfig,
    outputConfig: outputConfig,
  });

  const request = {
    parent: parent,
    batchPredictionJob,
  };

  // Create batch prediction job request
  const [response] = await jobServiceClient.createBatchPredictionJob(request);
  console.log('Response name: ', response.name);
  // Example response:
  // Response name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
}

await create_batch_prediction_gemini_gcs();

Java

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Java 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Java API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.cloud.aiplatform.v1.BatchPredictionJob;
import com.google.cloud.aiplatform.v1.GcsDestination;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.JobServiceClient;
import com.google.cloud.aiplatform.v1.JobServiceSettings;
import com.google.cloud.aiplatform.v1.LocationName;
import java.io.IOException;

public class CreateBatchPredictionGeminiJobSample {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Update these variables before running the sample.
    String project = "PROJECT_ID";
    String gcsDestinationOutputUriPrefix = "gs://MY_BUCKET/";

    createBatchPredictionGeminiJobSample(project, gcsDestinationOutputUriPrefix);
  }

  // Create a batch prediction job using a JSONL input file and output URI, both in Cloud
  // Storage.
  public static BatchPredictionJob createBatchPredictionGeminiJobSample(
      String project, String gcsDestinationOutputUriPrefix) throws IOException {
    String location = "us-central1";
    JobServiceSettings settings =
        JobServiceSettings.newBuilder()
            .setEndpoint(String.format("%s-aiplatform.googleapis.com:443", location))
            .build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (JobServiceClient client = JobServiceClient.create(settings)) {
      GcsSource gcsSource =
          GcsSource.newBuilder()
              .addUris(
                  "gs://cloud-samples-data/generative-ai/batch/"
                      + "batch_requests_for_multimodal_input.jsonl")
              // Or try
              // "gs://cloud-samples-data/generative-ai/batch/gemini_multimodal_batch_predict.jsonl"
              // for a batch prediction that uses audio, video, and an image.
              .build();
      BatchPredictionJob.InputConfig inputConfig =
          BatchPredictionJob.InputConfig.newBuilder()
              .setInstancesFormat("jsonl")
              .setGcsSource(gcsSource)
              .build();
      GcsDestination gcsDestination =
          GcsDestination.newBuilder().setOutputUriPrefix(gcsDestinationOutputUriPrefix).build();
      BatchPredictionJob.OutputConfig outputConfig =
          BatchPredictionJob.OutputConfig.newBuilder()
              .setPredictionsFormat("jsonl")
              .setGcsDestination(gcsDestination)
              .build();
      String modelName =
          String.format(
              "projects/%s/locations/%s/publishers/google/models/%s",
              project, location, "gemini-1.5-flash-002");

      BatchPredictionJob batchPredictionJob =
          BatchPredictionJob.newBuilder()
              .setDisplayName("my-display-name")
              .setModel(modelName) // Add model parameters per request in the input jsonl file.
              .setInputConfig(inputConfig)
              .setOutputConfig(outputConfig)
              .build();

      LocationName parent = LocationName.of(project, location);
      BatchPredictionJob response = client.createBatchPredictionJob(parent, batchPredictionJob);
      System.out.format("\tName: %s\n", response.getName());
      // Example response:
      //   Name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
      return response;
    }
  }
}

Go

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Go 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Go API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	aiplatform "cloud.google.com/go/aiplatform/apiv1"
	aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"

	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/structpb"
)

// batchPredictGCS submits a batch prediction job using GCS data source as its input
func batchPredictGCS(w io.Writer, projectID, location string, inputURIs []string, outputURI string) error {
	// location := "us-central1"
	// inputURIs := []string{"gs://cloud-samples-data/batch/prompt_for_batch_gemini_predict.jsonl"}
	// outputURI := "gs://<cloud-bucket-name>/<prefix-name>"
	modelName := "gemini-1.5-pro-002"
	jobName := "batch-predict-gcs-test-001"

	ctx := context.Background()
	apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location)
	client, err := aiplatform.NewJobClient(ctx, option.WithEndpoint(apiEndpoint))
	if err != nil {
		return fmt.Errorf("unable to create aiplatform client: %w", err)
	}
	defer client.Close()

	modelParameters, err := structpb.NewValue(map[string]interface{}{
		"temperature":     0.2,
		"maxOutputTokens": 200,
	})
	if err != nil {
		return fmt.Errorf("unable to convert model parameters to protobuf value: %w", err)
	}

	req := &aiplatformpb.CreateBatchPredictionJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
		BatchPredictionJob: &aiplatformpb.BatchPredictionJob{
			DisplayName:     jobName,
			Model:           fmt.Sprintf("publishers/google/models/%s", modelName),
			ModelParameters: modelParameters,
			// Check the API reference for `BatchPredictionJob` for supported input and output formats:
			// https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1#google.cloud.aiplatform.v1.BatchPredictionJob
			InputConfig: &aiplatformpb.BatchPredictionJob_InputConfig{
				Source: &aiplatformpb.BatchPredictionJob_InputConfig_GcsSource{
					GcsSource: &aiplatformpb.GcsSource{
						Uris: inputURIs,
					},
				},
				InstancesFormat: "jsonl",
			},
			OutputConfig: &aiplatformpb.BatchPredictionJob_OutputConfig{
				Destination: &aiplatformpb.BatchPredictionJob_OutputConfig_GcsDestination{
					GcsDestination: &aiplatformpb.GcsDestination{
						OutputUriPrefix: outputURI,
					},
				},
				PredictionsFormat: "jsonl",
			},
		},
	}

	job, err := client.CreateBatchPredictionJob(ctx, req)
	if err != nil {
		return err
	}
	fullJobId := job.GetName()
	fmt.Fprintf(w, "submitted batch predict job for model %q\n", job.GetModel())
	fmt.Fprintf(w, "job id: %q\n", fullJobId)
	fmt.Fprintf(w, "job state: %s\n", job.GetState())
	// Example response:
	// submitted batch predict job for model "publishers/google/models/gemini-1.5-pro-002"
	// job id: "projects/.../locations/.../batchPredictionJobs/1234567890000000000"
	// job state: JOB_STATE_PENDING

	for {
		time.Sleep(5 * time.Second)

		job, err := client.GetBatchPredictionJob(ctx, &aiplatformpb.GetBatchPredictionJobRequest{
			Name: fullJobId,
		})
		if err != nil {
			return fmt.Errorf("error: couldn't get updated job state: %w", err)
		}

		if job.GetEndTime() != nil {
			fmt.Fprintf(w, "batch predict job finished with state %s\n", job.GetState())
			break
		} else {
			fmt.Fprintf(w, "batch predict job is running... job state is %s\n", job.GetState())
		}
	}

	return nil
}

批量预测输出

批量预测任务完成后,输出存储在您在请求中指定的 Cloud Storage 存储桶或 BigQuery 表中。对于成功的请求行,模型响应会存储在 response 列中。否则,错误详情会存储在 status 列中以供进一步检查。

在长时间运行的作业期间,系统会持续将完成的预测导出到指定的输出目标位置。此过程会在 90 分钟后开始。如果批量预测作业被取消或失败,系统会导出所有已完成的预测。

Cloud Storage 输出示例

{
  "status": "",
  "processed_time": "2024-11-01T18:13:16.826+00:00",
  "request": {
    "contents": [
      {
        "parts": [
          {
            "fileData": null,
            "text": "What is the relation between the following video and image samples?"
          },
          {
            "fileData": {
              "fileUri": "gs://cloud-samples-data/generative-ai/video/animals.mp4",
              "mimeType": "video/mp4"
            },
            "text": null
          },
          {
            "fileData": {
              "fileUri": "gs://cloud-samples-data/generative-ai/image/cricket.jpeg",
              "mimeType": "image/jpeg"
            },
            "text": null
          }
        ],
        "role": "user"
      }
    ]
  },
  "response": {
    "candidates": [
      {
        "avgLogprobs": -0.5782725546095107,
        "content": {
          "parts": [
            {
              "text": "This video shows a Google Photos marketing campaign where animals at the Los Angeles Zoo take self-portraits using a modified Google phone housed in a protective case. The image is unrelated."
            }
          ],
          "role": "model"
        },
        "finishReason": "STOP"
      }
    ],
    "modelVersion": "gemini-1.5-flash-002@default",
    "usageMetadata": {
      "candidatesTokenCount": 36,
      "promptTokenCount": 29180,
      "totalTokenCount": 29216
    }
  }
}

适用于 BigQuery 的批量预测

指定 BigQuery 输入表、模型和输出位置。批量预测作业和表必须位于同一区域。

准备输入

BigQuery 存储输入

  • request 列是必需的,并且必须是有效的 JSON。此 JSON 数据代表您要提供给模型的输入。
  • request 列中的内容必须与 GenerateContentRequest 的结构一致。
  • 输入表可以包含 request 以外的列数据类型。这些列可以采用 BigQuery 数据类型,但以下类型除外:数组、结构体、范围、日期时间和地理位置。这些列在内容生成中会被忽略,但会包含在输出表中。系统为输出预留了两个列名称:responsestatus。这些参数用于提供有关批量预测作业结果的信息。
  • fileData 支持仅限于某些 Gemini 型号。
示例输入(JSON)
        
{
  "contents": [
    {
      "role": "user",
      "parts": [
        {
          "text": "Give me a recipe for banana bread."
        }
      ]
    }
  ],
  "system_instruction": {
    "parts": [
      {
        "text": "You are a chef."
      }
    ]
  }
}
        
        

请求批量预测作业

REST

如需创建批量预测作业,请使用 projects.locations.batchPredictionJobs.create 方法。

在使用任何请求数据之前,请先进行以下替换:

  • LOCATION:支持 Gemini 模型的区域。
  • PROJECT_ID:您的项目 ID
  • INPUT_URI:批量预测输入所在的 BigQuery 表,例如 bq://myproject.mydataset.input_table。不支持多区域数据集。
  • OUTPUT_FORMAT:如需输出到 BigQuery 表,请指定 bigquery。如需输出到 Cloud Storage 存储桶,请指定 jsonl
  • DESTINATION:对于 BigQuery,请指定 bigqueryDestination。对于 Cloud Storage,请指定 gcsDestination
  • OUTPUT_URI_FIELD_NAME:对于 BigQuery,请指定 outputUri。对于 Cloud Storage,请指定 outputUriPrefix
  • OUTPUT_URI:对于 BigQuery,请指定表位置,例如 bq://myproject.mydataset.output_result。输出 BigQuery 数据集的区域必须与 Vertex AI 批量预测作业的区域相同。 对于 Cloud Storage,请指定存储桶和目录位置,例如 gs://mybucket/path/to/output

HTTP 方法和网址:

POST https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs

请求 JSON 正文:

{
  "displayName": "my-bigquery-batch-prediction-job",
  "model": "publishers/google/models/gemini-1.5-flash-002",
  "inputConfig": {
    "instancesFormat": "bigquery",
    "bigquerySource":{
      "inputUri" : "INPUT_URI"
    }
  },
  "outputConfig": {
    "predictionsFormat": "OUTPUT_FORMAT",
    "DESTINATION": {
      "OUTPUT_URI_FIELD_NAME": "OUTPUT_URI"
    }
  }
}

如需发送请求,请选择以下方式之一:

curl

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs"

PowerShell

将请求正文保存在名为 request.json 的文件中,然后执行以下命令:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/batchPredictionJobs" | Select-Object -Expand Content

您应该收到类似以下内容的 JSON 响应。

响应包含批量作业的唯一标识符。您可以使用 BATCH_JOB_ID 轮询批量作业的状态,直到作业 stateJOB_STATE_SUCCEEDED。 例如:

curl \
  -X GET \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
https://us-central1-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/us-central1/batchPredictionJobs/BATCH_JOB_ID

Python

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Python 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Python API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import time
import vertexai

from vertexai.batch_prediction import BatchPredictionJob

# TODO(developer): Update and un-comment below line
# PROJECT_ID = "your-project-id"

# Initialize vertexai
vertexai.init(project=PROJECT_ID, location="us-central1")

input_uri = "bq://storage-samples.generative_ai.batch_requests_for_multimodal_input"

# Submit a batch prediction job with Gemini model
batch_prediction_job = BatchPredictionJob.submit(
    source_model="gemini-1.5-flash-002",
    input_dataset=input_uri,
    output_uri_prefix=output_uri,
)

# Check job status
print(f"Job resource name: {batch_prediction_job.resource_name}")
print(f"Model resource name with the job: {batch_prediction_job.model_name}")
print(f"Job state: {batch_prediction_job.state.name}")

# Refresh the job until complete
while not batch_prediction_job.has_ended:
    time.sleep(5)
    batch_prediction_job.refresh()

# Check if the job succeeds
if batch_prediction_job.has_succeeded:
    print("Job succeeded!")
else:
    print(f"Job failed: {batch_prediction_job.error}")

# Check the location of the output
print(f"Job output location: {batch_prediction_job.output_location}")

# Example response:
#  Job output location: bq://Project-ID/gen-ai-batch-prediction/predictions-model-year-month-day-hour:minute:second.12345

Node.js

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Node.js API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

// Import the aiplatform library
const aiplatformLib = require('@google-cloud/aiplatform');
const aiplatform = aiplatformLib.protos.google.cloud.aiplatform.v1;

/**
 * TODO(developer):  Uncomment/update these variables before running the sample.
 */
// projectId = 'YOUR_PROJECT_ID';
// URI of the output BigQuery table.
// E.g. "bq://[PROJECT].[DATASET].[TABLE]"
// outputUri = 'bq://projectid.dataset.table';

// URI of the multimodal input BigQuery table.
// E.g. "bq://[PROJECT].[DATASET].[TABLE]"
const inputUri =
  'bq://storage-samples.generative_ai.batch_requests_for_multimodal_input';
const location = 'us-central1';
const parent = `projects/${projectId}/locations/${location}`;
const modelName = `${parent}/publishers/google/models/gemini-1.5-flash-002`;

// Specify the location of the api endpoint.
const clientOptions = {
  apiEndpoint: `${location}-aiplatform.googleapis.com`,
};

// Instantiate the client.
const jobServiceClient = new aiplatformLib.JobServiceClient(clientOptions);

// Create a Gemini batch prediction job using BigQuery input and output datasets.
async function create_batch_prediction_gemini_bq() {
  const bqSource = new aiplatform.BigQuerySource({
    inputUri: inputUri,
  });

  const inputConfig = new aiplatform.BatchPredictionJob.InputConfig({
    bigquerySource: bqSource,
    instancesFormat: 'bigquery',
  });

  const bqDestination = new aiplatform.BigQueryDestination({
    outputUri: outputUri,
  });

  const outputConfig = new aiplatform.BatchPredictionJob.OutputConfig({
    bigqueryDestination: bqDestination,
    predictionsFormat: 'bigquery',
  });

  const batchPredictionJob = new aiplatform.BatchPredictionJob({
    displayName: 'Batch predict with Gemini - BigQuery',
    model: modelName, // Add model parameters per request in the input BigQuery table.
    inputConfig: inputConfig,
    outputConfig: outputConfig,
  });

  const request = {
    parent: parent,
    batchPredictionJob,
  };

  // Create batch prediction job request
  const [response] = await jobServiceClient.createBatchPredictionJob(request);
  console.log('Response name: ', response.name);
  // Example response:
  // Response name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
}

await create_batch_prediction_gemini_bq();

Java

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Java 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Java API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.cloud.aiplatform.v1.BatchPredictionJob;
import com.google.cloud.aiplatform.v1.BigQueryDestination;
import com.google.cloud.aiplatform.v1.BigQuerySource;
import com.google.cloud.aiplatform.v1.JobServiceClient;
import com.google.cloud.aiplatform.v1.JobServiceSettings;
import com.google.cloud.aiplatform.v1.LocationName;
import java.io.IOException;

public class CreateBatchPredictionGeminiBigqueryJobSample {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Update these variables before running the sample.
    String project = "PROJECT_ID";
    String bigqueryDestinationOutputUri = "bq://PROJECT_ID.MY_DATASET.MY_TABLE";

    createBatchPredictionGeminiBigqueryJobSample(project, bigqueryDestinationOutputUri);
  }

  // Create a batch prediction job using BigQuery input and output datasets.
  public static BatchPredictionJob createBatchPredictionGeminiBigqueryJobSample(
      String project, String bigqueryDestinationOutputUri) throws IOException {
    String location = "us-central1";
    JobServiceSettings settings =
        JobServiceSettings.newBuilder()
            .setEndpoint(String.format("%s-aiplatform.googleapis.com:443", location))
            .build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (JobServiceClient client = JobServiceClient.create(settings)) {
      BigQuerySource bigquerySource =
          BigQuerySource.newBuilder()
              .setInputUri("bq://storage-samples.generative_ai.batch_requests_for_multimodal_input")
              .build();
      BatchPredictionJob.InputConfig inputConfig =
          BatchPredictionJob.InputConfig.newBuilder()
              .setInstancesFormat("bigquery")
              .setBigquerySource(bigquerySource)
              .build();
      BigQueryDestination bigqueryDestination =
          BigQueryDestination.newBuilder().setOutputUri(bigqueryDestinationOutputUri).build();
      BatchPredictionJob.OutputConfig outputConfig =
          BatchPredictionJob.OutputConfig.newBuilder()
              .setPredictionsFormat("bigquery")
              .setBigqueryDestination(bigqueryDestination)
              .build();
      String modelName =
          String.format(
              "projects/%s/locations/%s/publishers/google/models/%s",
              project, location, "gemini-1.5-flash-002");

      BatchPredictionJob batchPredictionJob =
          BatchPredictionJob.newBuilder()
              .setDisplayName("my-display-name")
              .setModel(modelName) // Add model parameters per request in the input BigQuery table.
              .setInputConfig(inputConfig)
              .setOutputConfig(outputConfig)
              .build();

      LocationName parent = LocationName.of(project, location);
      BatchPredictionJob response = client.createBatchPredictionJob(parent, batchPredictionJob);
      System.out.format("\tName: %s\n", response.getName());
      // Example response:
      //   Name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
      return response;
    }
  }
}

Go

在尝试此示例之前,请按照《Vertex AI 快速入门:使用客户端库》中的 Go 设置说明执行操作。 如需了解详情,请参阅 Vertex AI Go API 参考文档

如需向 Vertex AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"
	"time"

	aiplatform "cloud.google.com/go/aiplatform/apiv1"
	aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"

	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/structpb"
)

// batchPredictBQ submits a batch prediction job using BigQuery data source as its input
func batchPredictBQ(w io.Writer, projectID, location string, inputURI string, outputURI string) error {
	// location  := "us-central1"
	// inputURI  := "bq://storage-samples.generative_ai.batch_requests_for_multimodal_input"
	// outputURI := "bq://<cloud-project-name>.<dataset-name>.<table-name>"
	modelName := "gemini-1.5-pro-002"
	jobName := "batch-predict-bq-test-001"

	ctx := context.Background()
	apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location)
	client, err := aiplatform.NewJobClient(ctx, option.WithEndpoint(apiEndpoint))
	if err != nil {
		return fmt.Errorf("unable to create aiplatform client: %w", err)
	}
	defer client.Close()

	modelParameters, err := structpb.NewValue(map[string]interface{}{
		"temperature":     0.2,
		"maxOutputTokens": 200,
	})
	if err != nil {
		return fmt.Errorf("unable to convert model parameters to protobuf value: %w", err)
	}

	req := &aiplatformpb.CreateBatchPredictionJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
		BatchPredictionJob: &aiplatformpb.BatchPredictionJob{
			DisplayName:     jobName,
			Model:           fmt.Sprintf("publishers/google/models/%s", modelName),
			ModelParameters: modelParameters,
			// Check the API reference for `BatchPredictionJob` for supported input and output formats:
			// https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1#google.cloud.aiplatform.v1.BatchPredictionJob
			InputConfig: &aiplatformpb.BatchPredictionJob_InputConfig{
				Source: &aiplatformpb.BatchPredictionJob_InputConfig_BigquerySource{
					BigquerySource: &aiplatformpb.BigQuerySource{
						InputUri: inputURI,
					},
				},
				InstancesFormat: "bigquery",
			},

			OutputConfig: &aiplatformpb.BatchPredictionJob_OutputConfig{
				Destination: &aiplatformpb.BatchPredictionJob_OutputConfig_BigqueryDestination{
					BigqueryDestination: &aiplatformpb.BigQueryDestination{
						OutputUri: outputURI,
					},
				},
				PredictionsFormat: "bigquery",
			},
		},
	}

	job, err := client.CreateBatchPredictionJob(ctx, req)
	if err != nil {
		return err
	}
	fullJobId := job.GetName()
	fmt.Fprintf(w, "submitted batch predict job for model %q\n", job.GetModel())
	fmt.Fprintf(w, "job id: %q\n", fullJobId)
	fmt.Fprintf(w, "job state: %s\n", job.GetState())
	// Example response:
	// submitted batch predict job for model "publishers/google/models/gemini-1.5-pro-002"
	// job id: "projects/.../locations/.../batchPredictionJobs/1234567890000000000"
	// job state: JOB_STATE_PENDING

	for {
		time.Sleep(5 * time.Second)

		job, err := client.GetBatchPredictionJob(ctx, &aiplatformpb.GetBatchPredictionJobRequest{
			Name: fullJobId,
		})
		if err != nil {
			return fmt.Errorf("error: couldn't get updated job state: %w", err)
		}

		if job.GetEndTime() != nil {
			fmt.Fprintf(w, "batch predict job finished with state %s\n", job.GetState())
			break
		} else {
			fmt.Fprintf(w, "batch predict job is running... job state is %s\n", job.GetState())
		}
	}

	return nil
}

检索批量输出

批量预测任务完成后,输出存储在您在请求中指定的 BigQuery 表中。

对于成功的请求行,模型响应会存储在 response 列中。否则,错误详情会存储在 status 列中以供进一步检查。

BigQuery 输出示例

请求 Response 状态
{"content":[{...}]}
{
  "candidates": [
    {
      "content": {
        "role": "model",
        "parts": [
          {
            "text": "In a medium bowl, whisk together the flour, baking soda, baking powder."
          }
        ]
      },
      "finishReason": "STOP",
      "safetyRatings": [
        {
          "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
          "probability": "NEGLIGIBLE",
          "probabilityScore": 0.14057204,
          "severity": "HARM_SEVERITY_NEGLIGIBLE",
          "severityScore": 0.14270912
        }
      ]
    }
  ],
  "usageMetadata": {
    "promptTokenCount": 8,
    "candidatesTokenCount": 396,
    "totalTokenCount": 404
  }
}

后续步骤