从 AWS S3 进行事件驱动型转移

Storage Transfer Service 可以监听 AWS 中的事件通知,以自动将在来源位置添加或更新的数据转移到 Cloud Storage 存储桶。详细了解事件驱动型转移的好处。

事件驱动的转移会监听发送到 Amazon SQS 的 Amazon S3 事件通知,以了解源存储桶中的对象何时已修改或添加。系统不会检测对象删除操作;在来源中删除对象不会删除目标存储桶中的关联对象。

事件驱动型转移始终使用 Cloud Storage 存储桶作为目标位置。

创建 SQS 队列

  1. 在 AWS 控制台中,前往 Simple Queue Service 页面。

  2. 点击创建队列

  3. 为此队列输入名称

  4. 访问权限政策部分中,选择高级。此时会显示一个 JSON 对象:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    AWSResource 的值对于每个项目都是唯一的。

  5. 将您的特定 AWSResource 值从显示的 JSON 复制到以下 JSON 代码段:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    上述 JSON 中的占位符值采用以下格式:

    • AWS 是表示 Amazon Web Services 项目的数值。例如 "aws:SourceAccount": "1234567890"
    • RESOURCE 是标识此队列的 Amazon Resource Number (ARN)。例如 "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
    • S3_BUCKET_ARN 是标识源存储桶的 ARN。例如 "aws:SourceArn": "arn:aws:s3:::example-aws-bucket"。您可以从 AWS 控制台的存储桶详情页面的属性标签页中找到存储桶的 ARN。
  6. 访问权限政策部分中显示的 JSON 替换为上面更新后的 JSON。

  7. 点击创建队列

完成后,记下队列的 Amazon 资源名称 (ARN)。ARN 采用以下格式:

arn:aws:sqs:us-east-1:1234567890:event-queue"

在 S3 存储桶上启用通知

  1. 在 AWS 控制台中,转到 S3 页面。

  2. 存储桶列表中,选择您的源存储桶。

  3. 选择属性标签页。

  4. 活动通知部分,点击创建活动通知

  5. 为此事件指定名称。

  6. 事件类型部分中,选择所有对象创建事件

  7. 选择 SQS 队列作为目标,然后选择您为此转移创建的队列。

  8. 点击保存更改

配置权限

按照“配置对来源的访问权限:Amazon S3”中的说明创建访问密钥 ID 和密钥或 Federated Identity 角色。

将自定义权限 JSON 替换为以下内容:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

创建后,请记下以下信息:

  • 对于用户,请记下访问密钥 ID 和密钥。
  • 对于 Federated Identity 角色,请记下 Amazon 资源名称 (ARN),其格式为 arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME

创建转移作业

您可以使用 REST API 或 Google Cloud 控制台创建基于事件的转移作业。

Cloud 控制台

  1. 前往 Google Cloud 控制台中的创建转移作业页面。

    前往创建转移作业

  2. 选择 Amazon S3 作为来源类型,并选择 Cloud Storage 作为目标位置。

  3. 选择事件驱动型作为时间安排模式,然后点击下一步

  4. 输入您的 S3 存储桶名称。 存储分区名称是其在 AWS 管理控制台中显示的名称。 例如 my-aws-bucket

  5. 选择身份验证方法,然后输入您在上一部分中创建并记下的所需信息。

  6. 输入您之前创建的 Amazon SQS 队列 ARN。它使用以下格式:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. (可选)定义任何过滤条件,然后点击下一步

  8. 选择目标 Cloud Storage 存储桶以及可选的路径。

  9. (可选)输入转移的开始时间和结束时间。如果您未指定时间,转移将立即开始,并会一直运行,直到手动停止为止。

  10. 指定任何转移选项。如需了解详情,请参阅创建转移作业页面。

  11. 点击创建

创建后,转移作业将开始运行,且事件监听器会等待 SQS 队列上的通知。作业详情页面每小时显示一次操作,并包含每项作业转移的数据的详细信息。

REST

如需使用 REST API 创建事件驱动型转移,请将以下 JSON 对象发送到 transferJobs.create 端点:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime 是可选的。如果省略了开始时间,则转移作业会立即开始;如果省略了结束时间,则转移作业会一直持续,直到手动停止为止。

客户端库

Go

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Go API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Java API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Node.js API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Python API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")