BigQuery データを使用して Gemini でバッチ予測を行う

BigQuery データソースを入力として使用し、Gemini でバッチテキスト予測を実行します。

もっと見る

このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。

コードサンプル

Go

このサンプルを試す前に、Vertex AI クイックスタート: クライアント ライブラリの使用にある Go の設定手順を完了してください。 詳細については、Vertex AI Go API のリファレンス ドキュメントをご覧ください。

Vertex AI に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	aiplatform "cloud.google.com/go/aiplatform/apiv1"
	aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"

	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/structpb"
)

// batchPredictBQ submits a batch prediction job using BigQuery data source as its input
func batchPredictBQ(w io.Writer, projectID, location string, inputURI string, outputURI string) error {
	// location  := "us-central1"
	// inputURI  := "bq://storage-samples.generative_ai.batch_requests_for_multimodal_input"
	// outputURI := "bq://<cloud-project-name>.<dataset-name>.<table-name>"
	modelName := "gemini-1.5-pro-002"
	jobName := "batch-predict-bq-test-001"

	ctx := context.Background()
	apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location)
	client, err := aiplatform.NewJobClient(ctx, option.WithEndpoint(apiEndpoint))
	if err != nil {
		return fmt.Errorf("unable to create aiplatform client: %w", err)
	}
	defer client.Close()

	modelParameters, err := structpb.NewValue(map[string]interface{}{
		"temperature":     0.2,
		"maxOutputTokens": 200,
	})
	if err != nil {
		return fmt.Errorf("unable to convert model parameters to protobuf value: %w", err)
	}

	req := &aiplatformpb.CreateBatchPredictionJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
		BatchPredictionJob: &aiplatformpb.BatchPredictionJob{
			DisplayName:     jobName,
			Model:           fmt.Sprintf("publishers/google/models/%s", modelName),
			ModelParameters: modelParameters,
			// Check the API reference for `BatchPredictionJob` for supported input and output formats:
			// https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1#google.cloud.aiplatform.v1.BatchPredictionJob
			InputConfig: &aiplatformpb.BatchPredictionJob_InputConfig{
				Source: &aiplatformpb.BatchPredictionJob_InputConfig_BigquerySource{
					BigquerySource: &aiplatformpb.BigQuerySource{
						InputUri: inputURI,
					},
				},
				InstancesFormat: "bigquery",
			},

			OutputConfig: &aiplatformpb.BatchPredictionJob_OutputConfig{
				Destination: &aiplatformpb.BatchPredictionJob_OutputConfig_BigqueryDestination{
					BigqueryDestination: &aiplatformpb.BigQueryDestination{
						OutputUri: outputURI,
					},
				},
				PredictionsFormat: "bigquery",
			},
		},
	}

	job, err := client.CreateBatchPredictionJob(ctx, req)
	if err != nil {
		return err
	}
	fullJobId := job.GetName()
	fmt.Fprintf(w, "submitted batch predict job for model %q\n", job.GetModel())
	fmt.Fprintf(w, "job id: %q\n", fullJobId)
	fmt.Fprintf(w, "job state: %s\n", job.GetState())
	// Example response:
	// submitted batch predict job for model "publishers/google/models/gemini-1.5-pro-002"
	// job id: "projects/.../locations/.../batchPredictionJobs/1234567890000000000"
	// job state: JOB_STATE_PENDING

	for {
		time.Sleep(5 * time.Second)

		job, err := client.GetBatchPredictionJob(ctx, &aiplatformpb.GetBatchPredictionJobRequest{
			Name: fullJobId,
		})
		if err != nil {
			return fmt.Errorf("error: couldn't get updated job state: %w", err)
		}

		if job.GetEndTime() != nil {
			fmt.Fprintf(w, "batch predict job finished with state %s\n", job.GetState())
			break
		} else {
			fmt.Fprintf(w, "batch predict job is running... job state is %s\n", job.GetState())
		}
	}

	return nil
}

Java

このサンプルを試す前に、Vertex AI クイックスタート: クライアント ライブラリの使用にある Java の設定手順を完了してください。詳細については、Vertex AI Java API のリファレンス ドキュメントをご覧ください。

Vertex AI に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.cloud.aiplatform.v1.BatchPredictionJob;
import com.google.cloud.aiplatform.v1.BigQueryDestination;
import com.google.cloud.aiplatform.v1.BigQuerySource;
import com.google.cloud.aiplatform.v1.JobServiceClient;
import com.google.cloud.aiplatform.v1.JobServiceSettings;
import com.google.cloud.aiplatform.v1.LocationName;
import java.io.IOException;

public class CreateBatchPredictionGeminiBigqueryJobSample {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Update these variables before running the sample.
    String project = "PROJECT_ID";
    String bigqueryDestinationOutputUri = "bq://PROJECT_ID.MY_DATASET.MY_TABLE";

    createBatchPredictionGeminiBigqueryJobSample(project, bigqueryDestinationOutputUri);
  }

  // Create a batch prediction job using BigQuery input and output datasets.
  public static BatchPredictionJob createBatchPredictionGeminiBigqueryJobSample(
      String project, String bigqueryDestinationOutputUri) throws IOException {
    String location = "us-central1";
    JobServiceSettings settings =
        JobServiceSettings.newBuilder()
            .setEndpoint(String.format("%s-aiplatform.googleapis.com:443", location))
            .build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (JobServiceClient client = JobServiceClient.create(settings)) {
      BigQuerySource bigquerySource =
          BigQuerySource.newBuilder()
              .setInputUri("bq://storage-samples.generative_ai.batch_requests_for_multimodal_input")
              .build();
      BatchPredictionJob.InputConfig inputConfig =
          BatchPredictionJob.InputConfig.newBuilder()
              .setInstancesFormat("bigquery")
              .setBigquerySource(bigquerySource)
              .build();
      BigQueryDestination bigqueryDestination =
          BigQueryDestination.newBuilder().setOutputUri(bigqueryDestinationOutputUri).build();
      BatchPredictionJob.OutputConfig outputConfig =
          BatchPredictionJob.OutputConfig.newBuilder()
              .setPredictionsFormat("bigquery")
              .setBigqueryDestination(bigqueryDestination)
              .build();
      String modelName =
          String.format(
              "projects/%s/locations/%s/publishers/google/models/%s",
              project, location, "gemini-1.5-flash-002");

      BatchPredictionJob batchPredictionJob =
          BatchPredictionJob.newBuilder()
              .setDisplayName("my-display-name")
              .setModel(modelName) // Add model parameters per request in the input BigQuery table.
              .setInputConfig(inputConfig)
              .setOutputConfig(outputConfig)
              .build();

      LocationName parent = LocationName.of(project, location);
      BatchPredictionJob response = client.createBatchPredictionJob(parent, batchPredictionJob);
      System.out.format("\tName: %s\n", response.getName());
      // Example response:
      //   Name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
      return response;
    }
  }
}

Node.js

このサンプルを試す前に、Vertex AI クイックスタート: クライアント ライブラリの使用にある Node.js の設定手順を完了してください。詳細については、Vertex AI Node.js API のリファレンス ドキュメントをご覧ください。

Vertex AI に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

// Import the aiplatform library
const aiplatformLib = require('@google-cloud/aiplatform');
const aiplatform = aiplatformLib.protos.google.cloud.aiplatform.v1;

/**
 * TODO(developer):  Uncomment/update these variables before running the sample.
 */
// projectId = 'YOUR_PROJECT_ID';
// URI of the output BigQuery table.
// E.g. "bq://[PROJECT].[DATASET].[TABLE]"
// outputUri = 'bq://projectid.dataset.table';

// URI of the multimodal input BigQuery table.
// E.g. "bq://[PROJECT].[DATASET].[TABLE]"
const inputUri =
  'bq://storage-samples.generative_ai.batch_requests_for_multimodal_input';
const location = 'us-central1';
const parent = `projects/${projectId}/locations/${location}`;
const modelName = `${parent}/publishers/google/models/gemini-1.5-flash-002`;

// Specify the location of the api endpoint.
const clientOptions = {
  apiEndpoint: `${location}-aiplatform.googleapis.com`,
};

// Instantiate the client.
const jobServiceClient = new aiplatformLib.JobServiceClient(clientOptions);

// Create a Gemini batch prediction job using BigQuery input and output datasets.
async function create_batch_prediction_gemini_bq() {
  const bqSource = new aiplatform.BigQuerySource({
    inputUri: inputUri,
  });

  const inputConfig = new aiplatform.BatchPredictionJob.InputConfig({
    bigquerySource: bqSource,
    instancesFormat: 'bigquery',
  });

  const bqDestination = new aiplatform.BigQueryDestination({
    outputUri: outputUri,
  });

  const outputConfig = new aiplatform.BatchPredictionJob.OutputConfig({
    bigqueryDestination: bqDestination,
    predictionsFormat: 'bigquery',
  });

  const batchPredictionJob = new aiplatform.BatchPredictionJob({
    displayName: 'Batch predict with Gemini - BigQuery',
    model: modelName, // Add model parameters per request in the input BigQuery table.
    inputConfig: inputConfig,
    outputConfig: outputConfig,
  });

  const request = {
    parent: parent,
    batchPredictionJob,
  };

  // Create batch prediction job request
  const [response] = await jobServiceClient.createBatchPredictionJob(request);
  console.log('Response name: ', response.name);
  // Example response:
  // Response name: projects/<project>/locations/us-central1/batchPredictionJobs/<job-id>
}

await create_batch_prediction_gemini_bq();

Python

このサンプルを試す前に、Vertex AI クイックスタート: クライアント ライブラリの使用にある Python の設定手順を完了してください。詳細については、Vertex AI Python API のリファレンス ドキュメントをご覧ください。

Vertex AI に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import time
import vertexai

from vertexai.batch_prediction import BatchPredictionJob

# TODO(developer): Update and un-comment below line
# PROJECT_ID = "your-project-id"

# Initialize vertexai
vertexai.init(project=PROJECT_ID, location="us-central1")

input_uri = "bq://storage-samples.generative_ai.batch_requests_for_multimodal_input"

# Submit a batch prediction job with Gemini model
batch_prediction_job = BatchPredictionJob.submit(
    source_model="gemini-1.5-flash-002",
    input_dataset=input_uri,
    output_uri_prefix=output_uri,
)

# Check job status
print(f"Job resource name: {batch_prediction_job.resource_name}")
print(f"Model resource name with the job: {batch_prediction_job.model_name}")
print(f"Job state: {batch_prediction_job.state.name}")

# Refresh the job until complete
while not batch_prediction_job.has_ended:
    time.sleep(5)
    batch_prediction_job.refresh()

# Check if the job succeeds
if batch_prediction_job.has_succeeded:
    print("Job succeeded!")
else:
    print(f"Job failed: {batch_prediction_job.error}")

# Check the location of the output
print(f"Job output location: {batch_prediction_job.output_location}")

# Example response:
#  Job output location: bq://Project-ID/gen-ai-batch-prediction/predictions-model-year-month-day-hour:minute:second.12345

次のステップ

他の Google Cloud プロダクトに関連するコードサンプルを検索およびフィルタするには、Google Cloud サンプル ブラウザをご覧ください。