以编程方式创建和管理数据转移作业

本页面介绍了如何通过 REST API 直接使用 Storage Transfer Service,以及如何利用 Java 和 Python 以编程方式在两种常见场景中使用该组件。要使用 Google Cloud Console 创建转移作业,请参阅使用 Console 创建和管理转移

使用 Storage Transfer API 以编程方式配置或编辑传输作业时,时间采用 UTC。如需详细了解如何指定传输作业的时间表,请参阅时间表

前期准备

在 Storage Transfer Service 中设置转移作业之前,请确保有必要的访问权限:

  • Storage Transfer Service 访问权限:您必须获得以下角色之一:

    • roles/owner
    • roles/editor
    • roles/storagetransfer.admin
    • roles/storagetransfer.user
    • 包含至少 roles/storagetransfer.user 权限的自定义角色。

      有关添加和查看项目级层权限的详情,请参阅使用 IAM 权限处理项目

    如需详细了解 Storage Transfer Service 中的 IAM 角色和权限,请参阅使用 IAM 角色和权限进行访问权限控制

  • 数据源和数据接收器访问权限:Storage Transfer Service 使用服务帐号执行传输。要访问数据源和数据接收器,此服务帐号必须具有数据源权限数据接收器权限

将数据从 Amazon S3 传输到 Cloud Storage

本示例演示了如何将文件从 Amazon S3 传输到 Cloud Storage 存储分区。请务必查看配置访问权限价格信息,以了解将数据从 Amazon S3 转移到 Cloud Storage 的影响。

创建转移作业

创建转移作业时,请勿在 Amazon S3 存储分区来源名称中为 bucketName 添加 s3:// 前缀。

REST

使用 transferJobs create 发出请求:
POST https://storagetransfer.googleapis.com/v1/transferJobs
{
    "description": "YOUR DESCRIPTION",
    "status": "ENABLED",
    "projectId": "PROJECT_ID",
    "schedule": {
        "scheduleStartDate": {
            "day": 1,
            "month": 1,
            "year": 2015
        },
        "scheduleEndDate": {
            "day": 1,
            "month": 1,
            "year": 2015
        },
        "startTimeOfDay": {
            "hours": 1,
            "minutes": 1
        }
    },
    "transferSpec": {
        "awsS3DataSource": {
            "bucketName": "AWS_SOURCE_NAME",
            "awsAccessKey": {
                "accessKeyId": "AWS_ACCESS_KEY_ID",
                "secretAccessKey": "AWS_SECRET_ACCESS_KEY"
            }
        },
        "gcsDataSink": {
            "bucketName": "GCS_SINK_NAME"
        }
    }
}
响应:
200 OK
{
    "transferJob": [
        {
            "creationTime": "2015-01-01T01:01:00.000000000Z",
            "description": "YOUR DESCRIPTION",
            "name": "transferJobs/JOB_ID",
            "status": "ENABLED",
            "lastModificationTime": "2015-01-01T01:01:00.000000000Z",
            "projectId": "PROJECT_ID",
            "schedule": {
                "scheduleStartDate": {
                    "day": 1,
                    "month": 1,
                    "year": 2015
                },
                "scheduleEndDate": {
                    "day": 1,
                    "month": 1,
                    "year": 2015
                },
                "startTimeOfDay": {
                    "hours": 1,
                    "minutes": 1
                }
            },
            "transferSpec": {
                "awsS3DataSource": {
                    "bucketName": "AWS_SOURCE_NAME"
                },
                "gcsDataSink": {
                    "bucketName": "GCS_SINK_NAME"
                },
                "objectConditions": {},
                "transferOptions": {}
            }
        }
    ]
}

Java

如需了解如何创建 Storage Transfer Service 客户端,请参阅为 Google API 库创建客户端


package com.google.cloud.storage.storagetransfer.samples;

import com.google.api.services.storagetransfer.v1.Storagetransfer;
import com.google.api.services.storagetransfer.v1.model.AwsAccessKey;
import com.google.api.services.storagetransfer.v1.model.AwsS3Data;
import com.google.api.services.storagetransfer.v1.model.Date;
import com.google.api.services.storagetransfer.v1.model.GcsData;
import com.google.api.services.storagetransfer.v1.model.Schedule;
import com.google.api.services.storagetransfer.v1.model.TimeOfDay;
import com.google.api.services.storagetransfer.v1.model.TransferJob;
import com.google.api.services.storagetransfer.v1.model.TransferSpec;
import java.io.IOException;
import java.io.PrintStream;

/** Creates a one-off transfer job from Amazon S3 to Google Cloud Storage. */
public final class AwsRequester {
  /**
   * Creates and executes a request for a TransferJob from Amazon S3 to Cloud Storage.
   *
   * The {@code startDate} and {@code startTime} parameters should be set according to the UTC
   * Time Zone. See:
   * https://developers.google.com/resources/api-libraries/documentation/storagetransfer/v1/java/latest/com/google/api/services/storagetransfer/v1/model/Schedule.html#getStartTimeOfDay()
   *
   * @return the response TransferJob if the request is successful
   * @throws InstantiationException if instantiation fails when building the TransferJob
   * @throws IllegalAccessException if an illegal access occurs when building the TransferJob
   * @throws IOException if the client failed to complete the request
   */
  public static TransferJob createAwsTransferJob(
      String projectId,
      String jobDescription,
      String awsSourceBucket,
      String gcsSinkBucket,
      String startDate,
      String startTime,
      String awsAccessKeyId,
      String awsSecretAccessKey)
      throws InstantiationException, IllegalAccessException, IOException {
    Date date = TransferJobUtils.createDate(startDate);
    TimeOfDay time = TransferJobUtils.createTimeOfDay(startTime);
    TransferJob transferJob =
        new TransferJob()
            .setDescription(jobDescription)
            .setProjectId(projectId)
            .setTransferSpec(
                new TransferSpec()
                    .setAwsS3DataSource(
                        new AwsS3Data()
                            .setBucketName(awsSourceBucket)
                            .setAwsAccessKey(
                                new AwsAccessKey()
                                    .setAccessKeyId(awsAccessKeyId)
                                    .setSecretAccessKey(awsSecretAccessKey)))
                    .setGcsDataSink(new GcsData().setBucketName(gcsSinkBucket)))
            .setSchedule(
                new Schedule()
                    .setScheduleStartDate(date)
                    .setScheduleEndDate(date)
                    .setStartTimeOfDay(time))
            .setStatus("ENABLED");

    Storagetransfer client = TransferClientCreator.createStorageTransferClient();
    return client.transferJobs().create(transferJob).execute();
  }

  public static void run(PrintStream out)
      throws InstantiationException, IllegalAccessException, IOException {
    String projectId = TransferJobUtils.getPropertyOrFail("projectId");
    String jobDescription = TransferJobUtils.getPropertyOrFail("jobDescription");
    String awsSourceBucket = TransferJobUtils.getPropertyOrFail("awsSourceBucket");
    String gcsSinkBucket = TransferJobUtils.getPropertyOrFail("gcsSinkBucket");
    String startDate = TransferJobUtils.getPropertyOrFail("startDate");
    String startTime = TransferJobUtils.getPropertyOrFail("startTime");
    String awsAccessKeyId = TransferJobUtils.getEnvOrFail("AWS_ACCESS_KEY_ID");
    String awsSecretAccessKey = TransferJobUtils.getEnvOrFail("AWS_SECRET_ACCESS_KEY");

    TransferJob responseT =
        createAwsTransferJob(
            projectId,
            jobDescription,
            awsSourceBucket,
            gcsSinkBucket,
            startDate,
            startTime,
            awsAccessKeyId,
            awsSecretAccessKey);
    out.println("Return transferJob: " + responseT.toPrettyString());
  }

  /** Output the contents of a successfully created TransferJob. */
  public static void main(String[] args) {
    try {
      run(System.out);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Python

如需了解如何创建 Storage Transfer Service 客户端,请参阅为 Google API 库创建客户端

"""Command-line sample that creates a one-time transfer from Amazon S3 to
Google Cloud Storage.

This sample is used on this page:

    https://cloud.google.com/storage/transfer/create-transfer

For more information, see README.md.
"""

import argparse
import datetime
import json

import googleapiclient.discovery

def main(description, project_id, start_date, start_time, source_bucket,
         access_key_id, secret_access_key, sink_bucket):
    """Create a one-time transfer from Amazon S3 to Google Cloud Storage."""
    storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1')

    # Edit this template with desired parameters.
    transfer_job = {
        'description': description,
        'status': 'ENABLED',
        'projectId': project_id,
        'schedule': {
            'scheduleStartDate': {
                'day': start_date.day,
                'month': start_date.month,
                'year': start_date.year
            },
            'scheduleEndDate': {
                'day': start_date.day,
                'month': start_date.month,
                'year': start_date.year
            },
            'startTimeOfDay': {
                'hours': start_time.hour,
                'minutes': start_time.minute,
                'seconds': start_time.second
            }
        },
        'transferSpec': {
            'awsS3DataSource': {
                'bucketName': source_bucket,
                'awsAccessKey': {
                    'accessKeyId': access_key_id,
                    'secretAccessKey': secret_access_key
                }
            },
            'gcsDataSink': {
                'bucketName': sink_bucket
            }
        }
    }

    result = storagetransfer.transferJobs().create(body=transfer_job).execute()
    print('Returned transferJob: {}'.format(
        json.dumps(result, indent=4)))

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument('description', help='Transfer description.')
    parser.add_argument('project_id', help='Your Google Cloud project ID.')
    parser.add_argument('start_date', help='Date YYYY/MM/DD.')
    parser.add_argument('start_time', help='UTC Time (24hr) HH:MM:SS.')
    parser.add_argument('source_bucket', help='AWS source bucket name.')
    parser.add_argument('access_key_id', help='Your AWS access key id.')
    parser.add_argument(
        'secret_access_key',
        help='Your AWS secret access key.'
    )
    parser.add_argument('sink_bucket', help='GCS sink bucket name.')

    args = parser.parse_args()
    start_date = datetime.datetime.strptime(args.start_date, '%Y/%m/%d')
    start_time = datetime.datetime.strptime(args.start_time, '%H:%M:%S')

    main(
        args.description,
        args.project_id,
        start_date,
        start_time,
        args.source_bucket,
        args.access_key_id,
        args.secret_access_key,
        args.sink_bucket)

在 Microsoft Azure Blob Storage 和 Cloud Storage 之间转移

本示例演示了如何将文件从 Microsoft Azure Storage 移至 Cloud Storage 存储分区。请务必查看配置访问权限价格信息,以了解将数据从 Microsoft Azure Storage 转移到 Cloud Storage 的影响。

REST

使用 transferJobs create 发出请求:
POST https://storagetransfer.googleapis.com/v1/transferJobs
{
    "description": "YOUR DESCRIPTION",
    "status": "ENABLED",
    "projectId": "PROJECT_ID",
    "schedule": {
        "scheduleStartDate": {
            "day": 14,
            "month": 2,
            "year": 2020
        },
        "scheduleEndDate": {
            "day": 14
            "month": 2,
            "year": 2020
        },
        "startTimeOfDay": {
            "hours": 1,
            "minutes": 1
        }
    },
    "transferSpec": {
        "azureBlobStorageDataSource": {
            "storageAccount": "AZURE_SOURCE_NAME",
            "azureCredentials": {
                "sasToken": "AZURE_SAS_TOKEN",
            },
            "container": "AZURE_CONTAINER",
        },
        "gcsDataSink": {
            "bucketName": "GCS_SINK_NAME"
        }
    }
}
响应:
200 OK
{
    "transferJob": [
        {
            "creationTime": "2020-02-14T01:01:00.000000000Z",
            "description": "YOUR DESCRIPTION",
            "name": "transferJobs/JOB_ID",
            "status": "ENABLED",
            "lastModificationTime": "2020-02-14T01:01:00.000000000Z",
            "projectId": "PROJECT_ID",
            "schedule": {
                "scheduleStartDate": {
                    "day": 14
                    "month": 2,
                    "year": 2020
                },
                "scheduleEndDate": {
                    "day": 14,
                    "month": 2,
                    "year": 2020
                },
                "startTimeOfDay": {
                    "hours": 1,
                    "minutes": 1
                }
            },
            "transferSpec": {
                "azureBlobStorageDataSource": {
                    "storageAccount": "AZURE_SOURCE_NAME",
                    "azureCredentials": {
                        "sasToken": "AZURE_SAS_TOKEN",
                    },
                    "container": "AZURE_CONTAINER",
                },
                "objectConditions": {},
                "transferOptions": {}
            }
        }
    ]
}

在 Cloud Storage 存储分区之间传输数据

本示例演示了如何将文件从一个 Cloud Storage 存储分区传输到另一个 Cloud Storage 存储分区。例如,您可以将数据复制到位于另一个位置的存储分区。

创建转移作业

REST

使用 transferJobs create 发出请求:
POST https://storagetransfer.googleapis.com/v1/transferJobs
{
    "description": "YOUR DESCRIPTION",
    "status": "ENABLED",
    "projectId": "PROJECT_ID",
    "schedule": {
        "scheduleStartDate": {
            "day": 1,
            "month": 1,
            "year": 2015
        },
        "startTimeOfDay": {
            "hours": 1,
            "minutes": 1
        }
    },
    "transferSpec": {
        "gcsDataSource": {
            "bucketName": "GCS_SOURCE_NAME"
        },
        "gcsDataSink": {
            "bucketName": "GCS_NEARLINE_SINK_NAME"
        },
        "objectConditions": {
            "minTimeElapsedSinceLastModification": "2592000s"
        },
        "transferOptions": {
            "deleteObjectsFromSourceAfterTransfer": true
        }
    }
}
响应:
200 OK
{
    "transferJob": [
        {
            "creationTime": "2015-01-01T01:01:00.000000000Z",
            "description": "YOUR DESCRIPTION",
            "name": "transferJobs/JOB_ID",
            "status": "ENABLED",
            "lastModificationTime": "2015-01-01T01:01:00.000000000Z",
            "projectId": "PROJECT_ID",
            "schedule": {
                "scheduleStartDate": {
                    "day": 1,
                    "month": 1,
                    "year": 2015
                },
                "startTimeOfDay": {
                    "hours": 1,
                    "minutes": 1
                }
            },
            "transferSpec": {
                "gcsDataSource": {
                    "bucketName": "GCS_SOURCE_NAME",
                },
                "gcsDataSink": {
                    "bucketName": "GCS_NEARLINE_SINK_NAME"
                },
                "objectConditions": {
                    "minTimeElapsedSinceLastModification": "2592000.000s"
                },
                "transferOptions": {
                    "deleteObjectsFromSourceAfterTransfer": true
                }
            }
        }
    ]
}

Java

如需了解如何创建 Storage Transfer Service 客户端,请参阅为 Google API 库创建客户端


package com.google.cloud.storage.storagetransfer.samples;

import com.google.api.services.storagetransfer.v1.Storagetransfer;
import com.google.api.services.storagetransfer.v1.model.Date;
import com.google.api.services.storagetransfer.v1.model.GcsData;
import com.google.api.services.storagetransfer.v1.model.ObjectConditions;
import com.google.api.services.storagetransfer.v1.model.Schedule;
import com.google.api.services.storagetransfer.v1.model.TimeOfDay;
import com.google.api.services.storagetransfer.v1.model.TransferJob;
import com.google.api.services.storagetransfer.v1.model.TransferOptions;
import com.google.api.services.storagetransfer.v1.model.TransferSpec;
import java.io.IOException;
import java.io.PrintStream;

/**
 * Creates a daily transfer from a standard Cloud Storage bucket to a Cloud Storage Nearline bucket
 * for files untouched for 30 days.
 */
public final class NearlineRequester {

  /**
   * Creates and executes a request for a TransferJob to Cloud Storage Nearline.
   *
   * The {@code startDate} and {@code startTime} parameters should be set according to the UTC
   * Time Zone. See:
   * https://developers.google.com/resources/api-libraries/documentation/storagetransfer/v1/java/latest/com/google/api/services/storagetransfer/v1/model/Schedule.html#getStartTimeOfDay()
   *
   * @return the response TransferJob if the request is successful
   * @throws InstantiationException if instantiation fails when building the TransferJob
   * @throws IllegalAccessException if an illegal access occurs when building the TransferJob
   * @throws IOException if the client failed to complete the request
   */
  public static TransferJob createNearlineTransferJob(
      String projectId,
      String jobDescription,
      String gcsSourceBucket,
      String gcsNearlineSinkBucket,
      String startDate,
      String startTime)
      throws InstantiationException, IllegalAccessException, IOException {
    Date date = TransferJobUtils.createDate(startDate);
    TimeOfDay time = TransferJobUtils.createTimeOfDay(startTime);
    TransferJob transferJob =
        new TransferJob()
            .setDescription(jobDescription)
            .setProjectId(projectId)
            .setTransferSpec(
                new TransferSpec()
                    .setGcsDataSource(new GcsData().setBucketName(gcsSourceBucket))
                    .setGcsDataSink(new GcsData().setBucketName(gcsNearlineSinkBucket))
                    .setObjectConditions(
                        new ObjectConditions()
                            .setMinTimeElapsedSinceLastModification("2592000s" /* 30 days */))
                    .setTransferOptions(
                        new TransferOptions().setDeleteObjectsFromSourceAfterTransfer(true)))
            .setSchedule(new Schedule().setScheduleStartDate(date).setStartTimeOfDay(time))
            .setStatus("ENABLED");

    Storagetransfer client = TransferClientCreator.createStorageTransferClient();
    return client.transferJobs().create(transferJob).execute();
  }

  public static void run(PrintStream out)
      throws InstantiationException, IllegalAccessException, IOException {
    String projectId = TransferJobUtils.getPropertyOrFail("projectId");
    String jobDescription = TransferJobUtils.getPropertyOrFail("jobDescription");
    String gcsSourceBucket = TransferJobUtils.getPropertyOrFail("gcsSourceBucket");
    String gcsNearlineSinkBucket = TransferJobUtils.getPropertyOrFail("gcsNearlineSinkBucket");
    String startDate = TransferJobUtils.getPropertyOrFail("startDate");
    String startTime = TransferJobUtils.getPropertyOrFail("startTime");

    TransferJob responseT =
        createNearlineTransferJob(
            projectId,
            jobDescription,
            gcsSourceBucket,
            gcsNearlineSinkBucket,
            startDate,
            startTime);
    out.println("Return transferJob: " + responseT.toPrettyString());
  }

  /**
   * Output the contents of a successfully created TransferJob.
   *
   * @param args arguments from the command line
   */
  public static void main(String[] args) {
    try {
      run(System.out);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Python

如需了解如何创建 Storage Transfer Service 客户端,请参阅为 Google API 库创建客户端


"""Command-line sample that creates a daily transfer from a standard
GCS bucket to a Nearline GCS bucket for objects untouched for 30 days.

This sample is used on this page:

    https://cloud.google.com/storage/transfer/create-transfer

For more information, see README.md.
"""

import argparse
import datetime
import json

import googleapiclient.discovery

def main(description, project_id, start_date, start_time, source_bucket,
         sink_bucket):
    """Create a daily transfer from Standard to Nearline Storage class."""
    storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1')

    # Edit this template with desired parameters.
    transfer_job = {
        'description': description,
        'status': 'ENABLED',
        'projectId': project_id,
        'schedule': {
            'scheduleStartDate': {
                'day': start_date.day,
                'month': start_date.month,
                'year': start_date.year
            },
            'startTimeOfDay': {
                'hours': start_time.hour,
                'minutes': start_time.minute,
                'seconds': start_time.second
            }
        },
        'transferSpec': {
            'gcsDataSource': {
                'bucketName': source_bucket
            },
            'gcsDataSink': {
                'bucketName': sink_bucket
            },
            'objectConditions': {
                'minTimeElapsedSinceLastModification': '2592000s'  # 30 days
            },
            'transferOptions': {
                'deleteObjectsFromSourceAfterTransfer': 'true'
            }
        }
    }

    result = storagetransfer.transferJobs().create(body=transfer_job).execute()
    print('Returned transferJob: {}'.format(
        json.dumps(result, indent=4)))

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument('description', help='Transfer description.')
    parser.add_argument('project_id', help='Your Google Cloud project ID.')
    parser.add_argument('start_date', help='Date YYYY/MM/DD.')
    parser.add_argument('start_time', help='UTC Time (24hr) HH:MM:SS.')
    parser.add_argument('source_bucket', help='Standard GCS bucket name.')
    parser.add_argument('sink_bucket', help='Nearline GCS bucket name.')

    args = parser.parse_args()
    start_date = datetime.datetime.strptime(args.start_date, '%Y/%m/%d')
    start_time = datetime.datetime.strptime(args.start_time, '%H:%M:%S')

    main(
        args.description,
        args.project_id,
        start_date,
        start_time,
        args.source_bucket,
        args.sink_bucket)

检查转移操作状态

当您使用 transferJobs.create 时,将返回 TransferJob 资源。

您可以在使用 transferJobs.get 创建作业后检查转移状态。如果转移作业的操作已开始,则会返回包含已填充的 latestOperationName 字段的 TransferJob。相反,如果转移作业的操作尚未启动,则 latestOperationName 字段为空。

查看转移作业的状态

REST

使用 transferJobs get 发出请求:
GET https://storagetransfer.googleapis.com/v1/{jobName="name"}

取消传输操作

如要取消单个转移操作,请使用 transferOperations cancel 方法。如要删除整个转移作业(包括已安排的未来转移操作),请使用transferJobs patch 方法将转移作业的状态设置为 DELETED。更新作业的转移状态不会影响正在进行的转移操作。如要取消正在进行的转移操作,请使用 transferOperations cancel 方法。

后续步骤

了解如何使用 Cloud Storage