事件驱动型转移

Storage Transfer Service 可以监听 AWS 或 Google Cloud 中的事件通知,以自动传输在来源位置添加或更新的数据。支持从 AWS S3 或 Cloud Storage 到 Cloud Storage 的事件驱动型转移。

事件驱动的转移会侦听发送到 Amazon SQS for AWS S3 来源的 Amazon S3 事件通知。Cloud Storage 来源会向 Pub/Sub 订阅发送通知。

事件驱动型转移的优势

由于事件驱动的转移作业会侦听源存储桶的变化,因此更新会近乎实时地复制到目标位置。Storage Transfer Service 不需要对来源执行列出操作,从而节省时间和金钱。

用例包括:

  • 事件驱动型分析:将数据从 AWS 复制到 Cloud Storage 以执行分析和处理。

  • Cloud Storage 复制:在 Cloud Storage 存储桶之间启用自动异步对象复制。

    使用 Storage Transfer Service 的事件驱动型转移与典型的 Cloud Storage 复制不同,它会在其他存储桶中创建数据副本。

    这样做有诸多好处,例如:

    • 将开发和生产数据保存在单独的命名空间中。
    • 共享数据但不提供对原始存储桶的访问权限。
    • 备份到其他大洲,或双区域和多区域存储未涵盖的区域。
  • DR/HA 设置:在数分钟内将对象从来源复制到备份目标位置:

    • 跨云备份:在 Cloud Storage 上创建 AWS S3 备份的副本。
    • 跨区域或跨项目备份:在其他区域或项目中创建 Cloud Storage 存储桶的副本。
  • 实时迁移:事件驱动型转移可以支持停机时间较短(停机时间为数分钟)的迁移,作为一次性批量迁移的后续步骤。

设置来自 Cloud Storage 的事件驱动型转移

Cloud Storage 中的事件驱动型转移使用 Pub/Sub 通知来了解源存储桶中的对象何时被修改或添加。系统不会检测到对象删除;删除来源中的对象不会删除目标存储桶中的关联对象。

配置权限

  1. 找到项目的 Storage Transfer Service 服务代理的名称:

    1. 前往 googleServiceAccounts.get 参考页面

      系统会打开标题为试用此方法的交互式面板。

    2. 在该面板的请求参数下,输入您的项目 ID。您在此处指定的项目必须是您用于管理 Storage Transfer Service 的项目,该项目可能与来源存储桶的项目不同。

    3. 点击执行

    服务代理的电子邮件地址会作为 accountEmail 的值返回。复制此值。

    服务代理的电子邮件地址采用 project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com 格式。

  2. Pub/Sub Subscriber 角色授予 Storage Transfer Service 服务代理。

    Cloud 控制台

    按照通过 Google Cloud 控制台控制访问权限中的说明,向 Storage Transfer Service 服务授予 Pub/Sub Subscriber 角色。可以在主题、订阅或项目级别授予此角色。

    gcloud CLI

    按照设置政策中的说明添加以下绑定:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }
    

配置 Pub/Sub

  1. 请确保您已满足将 Pub/Sub 与 Cloud Storage 搭配使用的前提条件

  2. 配置适用于 Cloud Storage 的 Pub/Sub 通知:

    gcloud storage buckets notifications create gs://BUCKET_NAME --topic=TOPIC_NAME
    
  3. 为该主题创建拉取订阅:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300
    

创建转移作业

您可以使用 REST API 或 Google Cloud 控制台来创建基于事件的转移作业。

请勿在转移作业名称中包含敏感信息,例如个人身份信息 (PII) 或安全数据。资源名称可能会传播到其他 Google Cloud 资源的名称,并且可能会向您项目之外的 Google 内部系统公开。

Cloud 控制台

  1. 转到 Google Cloud 控制台中的创建转移作业页面。

    请参阅创建转移作业

  2. 选择 Cloud Storage 作为来源和目标。

  3. 选择事件驱动型作为时间安排模式,然后点击下一步

  4. 选择此转移的来源存储桶。

  5. 事件流部分中,输入订阅名称:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. (可选)定义任意过滤条件,然后点击下一步

  7. 选择此转移的目标存储桶。

  8. (可选)输入转移的开始时间和结束时间。如果您没有指定时间,则转移将立即开始,并持续到手动停止为止。

  9. 指定任意转移选项。如需了解详情,请参阅创建转移作业页面。

  10. 点击创建

创建后,转移作业将开始运行,且事件监听器会等待 Pub/Sub 订阅上的通知。作业详情页面每小时显示一次操作,并包含每项作业转移的数据的详细信息。

REST

如需使用 REST API 创建事件驱动型转移作业,请将以下 JSON 对象发送到 transferJobs.create 端点:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime 是可选的。如果省略了开始时间,则转移作业会立即开始;如果省略了结束时间,则转移作业会一直持续,直到手动停止为止。

客户端库

Go

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Go API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Java API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Node.js API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Python API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


from google.cloud import storage_transfer

def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

设置从 AWS S3 执行事件驱动型转移

AWS S3 中的事件驱动型转移使用 Amazon Simple Queue Service (SQS) 的通知来了解源存储桶中的对象何时被修改或添加。系统不会检测到对象删除;删除来源中的对象不会删除目标存储桶中的关联对象。

创建 SQS 队列

  1. 在 AWS 控制台中,前往 Simple Queue Service 页面。

  2. 点击创建队列

  3. 为此队列输入名称

  4. 访问权限政策部分中,选择高级。此时会显示一个 JSON 对象:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    AWSResource 的值对于每个项目都是唯一的。

  5. 将您的特定 AWSResource 值从显示的 JSON 复制到以下 JSON 代码段:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }
    

    上述 JSON 中的占位符值采用以下格式:

    • AWS 是表示 Amazon Web Services 项目的数值。例如 "aws:SourceAccount": "1234567890"
    • RESOURCE 是标识此队列的 Amazon Resource Number (ARN)。例如 "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
    • S3_BUCKET_ARN 是标识源存储桶的 ARN。例如 "aws:SourceArn": "arn:aws:s3:::example-aws-bucket"。您可以从 AWS 控制台的存储桶详情页面的属性标签页中找到存储桶的 ARN。
  6. 访问权限政策部分中显示的 JSON 替换为上面更新后的 JSON。

  7. 点击创建队列

完成后,请记下队列的 Amazon 资源名称 (ARN)。ARN 格式如下:

arn:aws:sqs:us-east-1:1234567890:event-queue"

在 S3 存储桶上启用通知

  1. 在 AWS 控制台中,转到 S3 页面。

  2. 存储桶列表中,选择您的源存储桶。

  3. 选择属性标签页。

  4. 活动通知部分,点击创建活动通知

  5. 为此事件指定名称。

  6. 事件类型部分中,选择所有对象创建事件

  7. 目标位置部分,选择 SQS 队列,然后选择您为此传输创建的队列。

  8. 点击保存更改

配置权限

按照“配置对来源的访问权限:Amazon S3”中的说明创建访问密钥 ID 和密钥或 Federated Identity 角色。

将自定义权限 JSON 替换为以下代码:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::AWS_BUCKET_NAME",
                "arn:aws:s3:::AWS_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

创建后,请记下以下信息:

  • 对于用户,请记下访问密钥 ID 和密钥。
  • 对于 Federated Identity 角色,请记下 Amazon 资源名称 (ARN),其格式为 arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME

创建转移作业

您可以使用 REST API 或 Google Cloud 控制台来创建基于事件的转移作业。

Cloud 控制台

  1. 转到 Google Cloud 控制台中的创建转移作业页面。

    请参阅创建转移作业

  2. 选择 Amazon S3 作为来源类型,选择 Cloud Storage 作为目标位置。

  3. 选择事件驱动型作为时间安排模式,然后点击下一步

  4. 输入您的 S3 存储桶名称。 存储桶名称是其在 AWS 管理控制台中显示的名称。 例如 my-aws-bucket

  5. 选择身份验证方法,然后输入您在上一部分中创建并记下的所需信息。

  6. 输入您之前创建的 Amazon SQS 队列 ARN。它使用以下格式:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. (可选)定义任意过滤条件,然后点击下一步

  8. 选择目标 Cloud Storage 存储桶以及可选的路径。

  9. (可选)输入转移的开始时间和结束时间。如果您没有指定时间,系统会立即开始转移,并一直运行到手动停止为止。

  10. 指定任意转移选项。如需了解详情,请参阅创建转移作业页面。

  11. 点击创建

创建完成后,转移作业会开始运行,并且事件监听器会等待 SQS 队列中的通知。作业详情页面每小时显示一次操作,并包含每项作业转移的数据的详细信息。

REST

如需使用 REST API 创建事件驱动型转移作业,请将以下 JSON 对象发送到 transferJobs.create 端点:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime 是可选的。如果省略了开始时间,则转移作业会立即开始;如果省略了结束时间,则转移作业会一直持续,直到手动停止为止。

客户端库

Go

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Go API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Java API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Node.js API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Python API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


from google.cloud import storage_transfer

def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")