BigQuery Migration API 클라이언트 라이브러리

이 페이지에서는 BigQuery Migration API용 Cloud 클라이언트 라이브러리를 시작하는 방법을 설명합니다. 클라이언트 라이브러리를 사용하면 지원되는 언어로 Google Cloud API에 쉽게 액세스할 수 있습니다. 원시 요청을 서버에 보내 Google Cloud API를 직접 사용할 수 있지만 클라이언트 라이브러리는 작성해야 하는 코드 양을 크게 줄여 주는 간소화 기능을 제공합니다.

클라이언트 라이브러리 설명에서 Cloud 클라이언트 라이브러리 및 이전 Google API 클라이언트 라이브러리에 대해 자세히 알아보세요.

클라이언트 라이브러리 설치

Go

go get cloud.google.com/go/bigquery

자세한 내용은 Go 개발 환경 설정을 참조하세요.

Java

자세한 내용은 자바 개발 환경 설정을 참조하세요.

Python

pip install --upgrade google-cloud-bigquery-migration

자세한 내용은 Python 개발 환경 설정을 참조하세요.

인증 설정

Google Cloud API 호출을 인증하기 위해 클라이언트 라이브러리는 애플리케이션 기본 사용자 인증 정보(ADC)를 지원합니다. 라이브러리가 정의된 위치 집합에서 사용자 인증 정보를 찾고 이러한 사용자 인증 정보를 사용하여 API에 대한 요청을 인증합니다. ADC를 사용하면 애플리케이션 코드를 수정할 필요 없이 로컬 개발 또는 프로덕션과 같은 다양한 환경에서 애플리케이션에 사용자 인증 정보를 제공할 수 있습니다.

프로덕션 환경에서 ADC를 설정하는 방법은 서비스와 컨텍스트에 따라 다릅니다. 자세한 내용은 애플리케이션 기본 사용자 인증 정보 설정을 참조하세요.

로컬 개발 환경의 경우 Google 계정과 연결된 사용자 인증 정보를 사용하여 ADC를 설정할 수 있습니다.

  1. gcloud CLI를 설치하고 초기화합니다.

    gcloud CLI를 초기화할 때 애플리케이션에 필요한 리소스에 액세스할 수 있는 권한이 있는 Google Cloud 프로젝트를 지정해야 합니다.

  2. 사용자 인증 정보 파일을 만듭니다.

    gcloud auth application-default login

    로그인 화면이 표시됩니다. 로그인 후 사용자 인증 정보는 ADC에 사용하는 로컬 사용자 인증 정보 파일에 저장됩니다.

클라이언트 라이브러리 사용

다음 예시는 BigQuery Migration API와의 몇 가지 기본적인 상호작용을 보여줍니다.

Go

// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// The bigquery_migration_quickstart application demonstrates basic usage of the
// BigQuery migration API by executing a workflow that performs a batch SQL
// translation task.
package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"time"

	migration "cloud.google.com/go/bigquery/migration/apiv2"
	"cloud.google.com/go/bigquery/migration/apiv2/migrationpb"
)

func main() {
	// Define command line flags for controlling the behavior of this quickstart.
	projectID := flag.String("project_id", "", "Cloud Project ID.")
	location := flag.String("location", "us", "BigQuery Migration location used for interactions.")
	outputPath := flag.String("output", "", "Cloud Storage path for translated resources.")
	// Parse flags and do some minimal validation.
	flag.Parse()
	if *projectID == "" {
		log.Fatal("empty --project_id specified, please provide a valid project ID")
	}
	if *location == "" {
		log.Fatal("empty --location specified, please provide a valid location")
	}
	if *outputPath == "" {
		log.Fatalf("empty --output specified, please provide a valid cloud storage path")
	}

	ctx := context.Background()
	migClient, err := migration.NewClient(ctx)
	if err != nil {
		log.Fatalf("migration.NewClient: %v", err)
	}
	defer migClient.Close()

	workflow, err := executeTranslationWorkflow(ctx, migClient, *projectID, *location, *outputPath)
	if err != nil {
		log.Fatalf("workflow execution failed: %v\n", err)
	}

	reportWorkflowStatus(workflow)
}

// executeTranslationWorkflow constructs a migration workflow that performs batch SQL translation.
func executeTranslationWorkflow(ctx context.Context, client *migration.Client, projectID, location, outPath string) (*migrationpb.MigrationWorkflow, error) {

	// Construct the workflow creation request.  In this workflow, we have only a single translation task present.
	req := &migrationpb.CreateMigrationWorkflowRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, location),
		MigrationWorkflow: &migrationpb.MigrationWorkflow{
			DisplayName: "example SQL conversion",
			Tasks: map[string]*migrationpb.MigrationTask{
				"example_conversion": {
					Type: "Translation_Teradata2BQ",
					TaskDetails: &migrationpb.MigrationTask_TranslationConfigDetails{
						TranslationConfigDetails: &migrationpb.TranslationConfigDetails{
							SourceLocation: &migrationpb.TranslationConfigDetails_GcsSourcePath{
								GcsSourcePath: "gs://cloud-samples-data/bigquery/migration/translation/input/",
							},
							TargetLocation: &migrationpb.TranslationConfigDetails_GcsTargetPath{
								GcsTargetPath: outPath,
							},
							SourceDialect: &migrationpb.Dialect{
								DialectValue: &migrationpb.Dialect_TeradataDialect{
									TeradataDialect: &migrationpb.TeradataDialect{
										Mode: migrationpb.TeradataDialect_SQL,
									},
								},
							},
							TargetDialect: &migrationpb.Dialect{
								DialectValue: &migrationpb.Dialect_BigqueryDialect{},
							},
						},
					},
				},
			},
		},
	}

	// Create the workflow using the request.
	workflow, err := client.CreateMigrationWorkflow(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("CreateMigrationWorkflow: %w", err)
	}
	fmt.Printf("workflow created: %s", workflow.GetName())

	// This is an asyncronous process, so we now poll the workflow
	// until completion or a suitable timeout has elapsed.
	timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
	defer cancel()
	for {
		select {
		case <-timeoutCtx.Done():
			return nil, fmt.Errorf("task %s didn't complete due to context expiring", workflow.GetName())
		default:
			polledWorkflow, err := client.GetMigrationWorkflow(timeoutCtx, &migrationpb.GetMigrationWorkflowRequest{
				Name: workflow.GetName(),
			})
			if err != nil {
				return nil, fmt.Errorf("polling ended in error: %w", err)
			}
			if polledWorkflow.GetState() == migrationpb.MigrationWorkflow_COMPLETED {
				// polledWorkflow contains the most recent metadata about the workflow, so we return that.
				return polledWorkflow, nil
			}
			// workflow still isn't complete, so sleep briefly before polling again.
			time.Sleep(5 * time.Second)
		}
	}
}

// reportWorkflowStatus prints information about the workflow execution in a more human readable format.
func reportWorkflowStatus(workflow *migrationpb.MigrationWorkflow) {
	fmt.Printf("Migration workflow %s ended in state %s.\n", workflow.GetName(), workflow.GetState().String())
	for k, task := range workflow.GetTasks() {
		fmt.Printf(" - Task %s had id %s", k, task.GetId())
		if task.GetProcessingError() != nil {
			fmt.Printf(" with processing error: %s", task.GetProcessingError().GetReason())
		}
		fmt.Println()
	}
}

Python

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

def create_migration_workflow(
    gcs_input_path: str, gcs_output_path: str, project_id: str
) -> None:
    """Creates a migration workflow of a Batch SQL Translation and prints the response."""

    from google.cloud import bigquery_migration_v2

    parent = f"projects/{project_id}/locations/us"

    # Construct a BigQuery Migration client object.
    client = bigquery_migration_v2.MigrationServiceClient()

    # Set the source dialect to Teradata SQL.
    source_dialect = bigquery_migration_v2.Dialect()
    source_dialect.teradata_dialect = bigquery_migration_v2.TeradataDialect(
        mode=bigquery_migration_v2.TeradataDialect.Mode.SQL
    )

    # Set the target dialect to BigQuery dialect.
    target_dialect = bigquery_migration_v2.Dialect()
    target_dialect.bigquery_dialect = bigquery_migration_v2.BigQueryDialect()

    # Prepare the config proto.
    translation_config = bigquery_migration_v2.TranslationConfigDetails(
        gcs_source_path=gcs_input_path,
        gcs_target_path=gcs_output_path,
        source_dialect=source_dialect,
        target_dialect=target_dialect,
    )

    # Prepare the task.
    migration_task = bigquery_migration_v2.MigrationTask(
        type_="Translation_Teradata2BQ", translation_config_details=translation_config
    )

    # Prepare the workflow.
    workflow = bigquery_migration_v2.MigrationWorkflow(
        display_name="demo-workflow-python-example-Teradata2BQ"
    )

    workflow.tasks["translation-task"] = migration_task  # type: ignore

    # Prepare the API request to create a migration workflow.
    request = bigquery_migration_v2.CreateMigrationWorkflowRequest(
        parent=parent,
        migration_workflow=workflow,
    )

    response = client.create_migration_workflow(request=request)

    print("Created workflow:")
    print(response.display_name)
    print("Current state:")
    print(response.State(response.state))

추가 리소스

Go

다음 목록에는 Go용 클라이언트 라이브러리와 관련된 추가 리소스에 대한 링크가 포함되어 있습니다.

Java

다음 목록에는 자바용 클라이언트 라이브러리와 관련된 추가 리소스에 대한 링크가 포함되어 있습니다.

Python

다음 목록에는 Python용 클라이언트 라이브러리와 관련된 추가 리소스에 대한 링크가 포함되어 있습니다.

다음 단계

자세한 배경 정보는 BigQuery 마이그레이션 서비스 소개 페이지를 참조하세요.