从 Cloud Storage 进行事件驱动型传输

Storage Transfer Service 可以监听 Google Cloud 中的事件通知,以自动传输已添加或更新到 Cloud Storage 存储桶中的数据。详细了解事件驱动型转移的好处。

来自 Cloud Storage 的事件驱动型转移使用 Pub/Sub 通知来了解源存储桶中的对象何时已修改或添加。系统不会检测对象删除操作;在来源中删除对象不会删除目标存储桶中的关联对象。

配置权限

  1. 找到项目的 Storage Transfer Service 服务代理的名称:

    1. 前往 googleServiceAccounts.get 参考页面

      系统会打开标题为试用此方法的交互式面板。

    2. 在该面板的请求参数下,输入您的项目 ID。您在此处指定的项目必须是您用于管理 Storage Transfer Service 的项目,该项目可能与来源存储桶的项目不同。

    3. 点击执行

    服务代理的电子邮件地址会作为 accountEmail 的值返回。复制此值。

    服务代理的电子邮件地址采用 project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com 格式。

  2. Pub/Sub Subscriber 角色授予 Storage Transfer Service 服务代理。

    Cloud 控制台

    按照通过 Google Cloud 控制台控制访问权限中的说明,向 Storage Transfer Service 服务授予 Pub/Sub Subscriber 角色。您可以在主题、订阅或项目级层授予该角色。

    gcloud CLI

    按照设置政策中的说明添加以下绑定:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

配置 Pub/Sub

  1. 请确保您已满足将 Pub/Sub 与 Cloud Storage 搭配使用的前提条件

  2. 配置适用于 Cloud Storage 的 Pub/Sub 通知:

    gcloud storage buckets notifications create gs://BUCKET_NAME --topic=TOPIC_NAME
  3. 为该主题创建拉取订阅:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

创建转移作业

您可以使用 REST API 或 Google Cloud 控制台创建基于事件的转移作业。

请勿在转移作业名称中包含敏感信息,例如个人身份信息 (PII) 或安全数据。资源名称可能会传播到其他 Google Cloud 资源的名称,并且可能会向您项目之外的 Google 内部系统公开。

Cloud 控制台

  1. 转到 Google Cloud 控制台中的创建转移作业页面。

    前往创建转移作业

  2. Cloud Storage 同时用作来源和目标位置。

  3. 选择事件驱动型作为时间安排模式,然后点击下一步

  4. 选择此转移的来源存储桶。

  5. 事件流部分中,输入订阅名称:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. (可选)定义任何过滤条件,然后点击下一步

  7. 选择此转移的目标存储桶。

  8. (可选)输入转移的开始时间和结束时间。如果您未指定时间,转移将立即开始,并会一直运行,直到手动停止为止。

  9. 指定任何转移选项。如需了解详情,请参阅创建转移作业页面。

  10. 点击创建

创建后,转移作业将开始运行,且事件监听器会等待 Pub/Sub 订阅上的通知。作业详情页面每小时显示一次操作,并包含每项作业转移的数据的详细信息。

REST

如需使用 REST API 创建事件驱动型转移,请将以下 JSON 对象发送到 transferJobs.create 端点:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime 是可选的。如果省略了开始时间,则转移作业会立即开始;如果省略了结束时间,则转移作业会一直持续,直到手动停止为止。

客户端库

Go

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Go API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Java API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Node.js API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

如需了解如何安装和使用 Storage Transfer Service 客户端库,请参阅 Storage Transfer Service 客户端库。 如需了解详情,请参阅 Storage Transfer Service Python API 参考文档

如需向 Storage Transfer Service 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")