BigQuery Migration API 客户端库

本页面介绍了如何使用 BigQuery Migration API 的 Cloud 客户端库。通过客户端库,您可以更轻松地使用支持的语言访问 Google Cloud API。虽然您可以通过向服务器发出原始请求来直接使用 Google Cloud API,但客户端库可实现简化,从而显著减少您需要编写的代码量。

请参阅客户端库说明,详细了解 Cloud 客户端库和旧版 Google API 客户端库。

安装客户端库

Go

go get cloud.google.com/go/bigquery

如需了解详情,请参阅设置 Go 开发环境

Java

如需了解详情,请参阅设置 Java 开发环境

Python

pip install --upgrade google-cloud-bigquery-migration

如需了解详情,请参阅设置 Python 开发环境

设置身份验证

为了对 Google Cloud API 的调用进行身份验证,客户端库支持应用默认凭据 (ADC);这些库会在一组指定的位置查找凭据,并使用这些凭据对发送到 API 的请求进行身份验证。借助 ADC,您可以在各种环境(例如本地开发或生产环境)中为您的应用提供凭据,而无需修改应用代码。

对于生产环境,设置 ADC 的方式取决于服务和上下文。如需了解详情,请参阅设置应用默认凭据

对于本地开发环境,您可以使用与您的 Google 账号关联的凭据设置 ADC:

  1. 安装并初始化 gcloud CLI

    初始化 gcloud CLI 时,请务必指定您在其中有权访问应用所需的资源的 Google Cloud 项目。

  2. 创建凭据文件:

    gcloud auth application-default login

    登录屏幕随即出现。在您登录后,您的凭据会存储在 ADC 使用的本地凭据文件中。

使用客户端库

以下示例演示了与 BigQuery Migration API 的一些基本交互。

Go

// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// The bigquery_migration_quickstart application demonstrates basic usage of the
// BigQuery migration API by executing a workflow that performs a batch SQL
// translation task.
package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"time"

	migration "cloud.google.com/go/bigquery/migration/apiv2"
	"cloud.google.com/go/bigquery/migration/apiv2/migrationpb"
)

func main() {
	// Define command line flags for controlling the behavior of this quickstart.
	projectID := flag.String("project_id", "", "Cloud Project ID.")
	location := flag.String("location", "us", "BigQuery Migration location used for interactions.")
	outputPath := flag.String("output", "", "Cloud Storage path for translated resources.")
	// Parse flags and do some minimal validation.
	flag.Parse()
	if *projectID == "" {
		log.Fatal("empty --project_id specified, please provide a valid project ID")
	}
	if *location == "" {
		log.Fatal("empty --location specified, please provide a valid location")
	}
	if *outputPath == "" {
		log.Fatalf("empty --output specified, please provide a valid cloud storage path")
	}

	ctx := context.Background()
	migClient, err := migration.NewClient(ctx)
	if err != nil {
		log.Fatalf("migration.NewClient: %v", err)
	}
	defer migClient.Close()

	workflow, err := executeTranslationWorkflow(ctx, migClient, *projectID, *location, *outputPath)
	if err != nil {
		log.Fatalf("workflow execution failed: %v\n", err)
	}

	reportWorkflowStatus(workflow)
}

// executeTranslationWorkflow constructs a migration workflow that performs batch SQL translation.
func executeTranslationWorkflow(ctx context.Context, client *migration.Client, projectID, location, outPath string) (*migrationpb.MigrationWorkflow, error) {

	// Construct the workflow creation request.  In this workflow, we have only a single translation task present.
	req := &migrationpb.CreateMigrationWorkflowRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
		MigrationWorkflow: &migrationpb.MigrationWorkflow{
			DisplayName: "example SQL conversion",
			Tasks: map[string]*migrationpb.MigrationTask{
				"example_conversion": {
					Type: "Translation_Teradata2BQ",
					TaskDetails: &migrationpb.MigrationTask_TranslationConfigDetails{
						TranslationConfigDetails: &migrationpb.TranslationConfigDetails{
							SourceLocation: &migrationpb.TranslationConfigDetails_GcsSourcePath{
								GcsSourcePath: "gs://cloud-samples-data/bigquery/migration/translation/input/",
							},
							TargetLocation: &migrationpb.TranslationConfigDetails_GcsTargetPath{
								GcsTargetPath: outPath,
							},
							SourceDialect: &migrationpb.Dialect{
								DialectValue: &migrationpb.Dialect_TeradataDialect{
									TeradataDialect: &migrationpb.TeradataDialect{
										Mode: migrationpb.TeradataDialect_SQL,
									},
								},
							},
							TargetDialect: &migrationpb.Dialect{
								DialectValue: &migrationpb.Dialect_BigqueryDialect{},
							},
						},
					},
				},
			},
		},
	}

	// Create the workflow using the request.
	workflow, err := client.CreateMigrationWorkflow(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("CreateMigrationWorkflow: %w", err)
	}
	fmt.Printf("workflow created: %s", workflow.GetName())

	// This is an asyncronous process, so we now poll the workflow
	// until completion or a suitable timeout has elapsed.
	timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
	defer cancel()
	for {
		select {
		case <-timeoutCtx.Done():
			return nil, fmt.Errorf("task %s didn't complete due to context expiring", workflow.GetName())
		default:
			polledWorkflow, err := client.GetMigrationWorkflow(timeoutCtx, &migrationpb.GetMigrationWorkflowRequest{
				Name: workflow.GetName(),
			})
			if err != nil {
				return nil, fmt.Errorf("polling ended in error: %w", err)
			}
			if polledWorkflow.GetState() == migrationpb.MigrationWorkflow_COMPLETED {
				// polledWorkflow contains the most recent metadata about the workflow, so we return that.
				return polledWorkflow, nil
			}
			// workflow still isn't complete, so sleep briefly before polling again.
			time.Sleep(5 * time.Second)
		}
	}
}

// reportWorkflowStatus prints information about the workflow execution in a more human readable format.
func reportWorkflowStatus(workflow *migrationpb.MigrationWorkflow) {
	fmt.Printf("Migration workflow %s ended in state %s.\n", workflow.GetName(), workflow.GetState().String())
	for k, task := range workflow.GetTasks() {
		fmt.Printf(" - Task %s had id %s", k, task.GetId())
		if task.GetProcessingError() != nil {
			fmt.Printf(" with processing error: %s", task.GetProcessingError().GetReason())
		}
		fmt.Println()
	}
}

Python

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

def create_migration_workflow(
    gcs_input_path: str, gcs_output_path: str, project_id: str
) -> None:
    """Creates a migration workflow of a Batch SQL Translation and prints the response."""

    from google.cloud import bigquery_migration_v2

    parent = f"projects/{project_id}/locations/us"

    # Construct a BigQuery Migration client object.
    client = bigquery_migration_v2.MigrationServiceClient()

    # Set the source dialect to Teradata SQL.
    source_dialect = bigquery_migration_v2.Dialect()
    source_dialect.teradata_dialect = bigquery_migration_v2.TeradataDialect(
        mode=bigquery_migration_v2.TeradataDialect.Mode.SQL
    )

    # Set the target dialect to BigQuery dialect.
    target_dialect = bigquery_migration_v2.Dialect()
    target_dialect.bigquery_dialect = bigquery_migration_v2.BigQueryDialect()

    # Prepare the config proto.
    translation_config = bigquery_migration_v2.TranslationConfigDetails(
        gcs_source_path=gcs_input_path,
        gcs_target_path=gcs_output_path,
        source_dialect=source_dialect,
        target_dialect=target_dialect,
    )

    # Prepare the task.
    migration_task = bigquery_migration_v2.MigrationTask(
        type_="Translation_Teradata2BQ", translation_config_details=translation_config
    )

    # Prepare the workflow.
    workflow = bigquery_migration_v2.MigrationWorkflow(
        display_name="demo-workflow-python-example-Teradata2BQ"
    )

    workflow.tasks["translation-task"] = migration_task  # type: ignore

    # Prepare the API request to create a migration workflow.
    request = bigquery_migration_v2.CreateMigrationWorkflowRequest(
        parent=parent,
        migration_workflow=workflow,
    )

    response = client.create_migration_workflow(request=request)

    print("Created workflow:")
    print(response.display_name)
    print("Current state:")
    print(response.State(response.state))

其他资源

Go

以下列表包含与 Go 版客户端库相关的更多资源的链接:

Java

以下列表包含与 Java 版客户端库相关的更多资源的链接:

Python

以下列表包含与 Python 版客户端库相关的更多资源的链接:

后续步骤

如需详细了解背景信息,请参阅 BigQuery Migration Service 简介页面。