イベント ドリブン転送

Storage Transfer Service は、AWS または Google Cloud のイベント通知をリッスンし、ソースのロケーションで追加または更新されたデータを自動的に転送できます。AWS S3 または Cloud Storage から Cloud Storage へのイベント ドリブン転送がサポートされています。

イベント ドリブン転送は、Amazon SQS for AWS S3 ソースに送信された Amazon S3 イベント通知をリッスンします。Cloud Storage ソースは Pub/Sub サブスクリプションに通知を送信します。

イベント ドリブン転送のメリット

イベント ドリブン転送は、ソースバケットに対する変更をリッスンするため、更新がほぼリアルタイムで宛先にコピーされます。Storage Transfer Service はソースに対して list オペレーションを実行する必要がないため、時間とコストを節約できます。

次のような用途があります。

  • イベント ドリブン分析: AWS から Cloud Storage にデータを複製して、分析と処理を実行します。

  • Cloud Storage レプリケーション: Cloud Storage バケット間でオブジェクトの非同期の自動レプリケーションを有効にします。

    Storage Transfer Service によるイベント ドリブン転送は、データのコピーを別のバケットに作成する通常の Cloud Storage レプリケーションとは異なります。

    これには、次のような利点があります。

    • 開発データと本番環境のデータを別々の Namespace に保存する。
    • 元のバケットへのアクセス権を付与せずにデータを共有する。
    • 別の大陸、またはデュアルリージョンとマルチリージョンのストレージでカバーされないエリアにバックアップする。
  • DR / HA 設定: オブジェクトを数分以内にソースからバックアップの宛先に複製します。

    • クロスクラウド バックアップ: Cloud Storage に AWS S3 のバックアップのコピーを作成します。
    • クロスリージョンまたはプロジェクト間のバックアップ: 別のリージョンまたはプロジェクトに Cloud Storage バケットのコピーを作成します。
  • ライブ マイグレーション: 1 回限りのバッチ移行のフォローアップとして、イベント ドリブン転送により、ダウンタイムの短い移行を実現できます(数分程度のダウンタイム)。

Cloud Storage からのイベント ドリブン転送を設定する

Cloud Storage からのイベント ドリブン転送では、Pub/Sub 通知を使用して、ソースバケット内のオブジェクトが変更または追加されたタイミングを確認します。オブジェクトの削除は検出されません。転送元でオブジェクトを削除しても、転送先バケット内の関連付けられているオブジェクトは削除されません。

権限を構成する

  1. プロジェクトの Storage Transfer Service サービス エージェントの名前を確認します。

    1. googleServiceAccounts.get リファレンス ページに移動します。

      [Try this method] というインタラクティブ パネルが開きます。

    2. パネルの [Request parameters] にプロジェクト ID を入力します。ここで指定するプロジェクトは Storage Transfer Service の管理に使用しているプロジェクトで、これは、ソースバケットのプロジェクトとは異なる場合があります。

    3. [実行] をクリックします。

    サービス エージェントのメールアドレスが accountEmail の値として返されます。この値をコピーします。

    サービス エージェントのメールアドレスは project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com の形式です。

  2. Pub/Sub Subscriber ロールを Storage Transfer Service サービス エージェントに付与します。

    Cloud コンソール

    Google Cloud コンソールによるアクセス制御の手順で、Storage Transfer Service サービスに Pub/Sub Subscriber ロールを付与します。このロールは、トピック、サブスクリプション、プロジェクト レベルで付与できます。

    gcloud CLI

    ポリシーの設定の手順で、次のバインディングを追加します。

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }
    

Pub/Sub を構成する

  1. Cloud Storage で Pub/Sub の前提条件を満たしていることを確認します。

  2. Cloud Storage の Pub/Sub 通知を構成します。

    gcloud storage buckets notifications create gs://BUCKET_NAME --topic=TOPIC_NAME
    
  3. トピックに pull サブスクリプションを作成します。

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300
    

転送ジョブを作成する

REST API または Google Cloud コンソールを使用して、イベントベースの転送ジョブを作成できます。

転送ジョブ名に、個人を特定できる情報(PII)やセキュリティ データなどの機密情報を含めないでください。リソース名は、他の Google Cloud リソースの名前に伝達され、プロジェクト外部の Google 内部システムに公開される場合があります。

Cloud コンソール

  1. Google Cloud コンソールで [転送ジョブを作成] ページに移動します。

    [転送ジョブを作成] に移動

  2. 転送元と転送先の両方で「Cloud Storage」を選択します。

  3. [スケジュール モード] として [イベント ドリブン] を選択し、[次のステップ] をクリックします。

  4. この転送の転送元バケットを選択します。

  5. [イベント ストリーム] セクションで、サブスクリプション名を入力します。

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. 必要に応じてフィルタを定義し、[次のステップ] をクリックします。

  7. この転送の転送先バケットを選択します。

  8. 必要に応じて、転送の開始時刻と終了時刻を入力します。時間を指定しない場合、転送はすぐに開始し、手動で停止するまで実行されます。

  9. 転送オプションを指定します。詳細については、転送の作成ページをご覧ください。

  10. [作成] をクリックします。

作成されると、転送ジョブの実行が開始し、イベント リスナーが Pub/Sub サブスクリプションの通知を待機します。ジョブの詳細ページには、1 時間ごとにオペレーションが 1 つ表示されます。このページには、各ジョブで転送されたデータの詳細も表示されます。

REST

REST API を使用してイベント ドリブン転送を作成するには、次の JSON オブジェクトを transferJobs.create エンドポイントに送信します。

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime は省略可能です。開始時間を省略すると、転送がすぐに開始します。終了時間を省略すると、手動で停止するまで転送が続行されます。

クライアント ライブラリ

Go

Storage Transfer Service 用のクライアント ライブラリをインストールして使用する方法については、Storage Transfer Service のクライアント ライブラリをご覧ください。詳細については、Storage Transfer Service Go API のリファレンス ドキュメントをご覧ください。

Storage Transfer Service の認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Storage Transfer Service 用のクライアント ライブラリをインストールして使用する方法については、Storage Transfer Service のクライアント ライブラリをご覧ください。詳細については、Storage Transfer Service Java API のリファレンス ドキュメントをご覧ください。

Storage Transfer Service の認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Storage Transfer Service 用のクライアント ライブラリをインストールして使用する方法については、Storage Transfer Service のクライアント ライブラリをご覧ください。詳細については、Storage Transfer Service Node.js API のリファレンス ドキュメントをご覧ください。

Storage Transfer Service の認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Storage Transfer Service 用のクライアント ライブラリをインストールして使用する方法については、Storage Transfer Service のクライアント ライブラリをご覧ください。詳細については、Storage Transfer Service Python API のリファレンス ドキュメントをご覧ください。

Storage Transfer Service の認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


from google.cloud import storage_transfer

def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

AWS S3 からのイベント ドリブン転送を設定する

AWS S3 からのイベント ドリブン転送では、Amazon Simple Queue Service(SQS)からの通知を使用して、ソースバケットのオブジェクトが変更または追加されたタイミングを確認します。 オブジェクトの削除は検出されません。転送元でオブジェクトを削除しても、転送先バケット内の関連付けられているオブジェクトは削除されません。

SQS キューを作成する

  1. AWS コンソールで [Simple Queue Service] ページに移動します。

  2. [Create queue] をクリックします。

  3. このキューの名前を入力します。

  4. [Access policy] セクションで、[Advanced] を選択します。JSON オブジェクトが表示されます。

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    AWSResource の値はプロジェクトごとに一意です。

  5. 表示された JSON から AWSResource の特定の値を次の JSON スニペットにコピーします。

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }
    

    上記の JSON 内のプレースホルダの値は次の形式です。

    • AWS は、アマゾン ウェブ サービス プロジェクトを表す数値です。例: "aws:SourceAccount": "1234567890"
    • RESOURCE は、このキューを識別する Amazon Resource Number(ARN)です。例: "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
    • S3_BUCKET_ARN は、ソースバケットを識別する ARN です。例: "aws:SourceArn": "arn:aws:s3:::example-aws-bucket"バケットの ARN は、AWS コンソールのバケットの詳細ページの [Properties] タブで確認できます。
  6. [Access policy] セクションに表示された JSON を上で更新した JSON に置き換えます。

  7. [Create queue] をクリックします。

完了したら、キューの Amazon Resource Name(ARN)をメモします。ARN の形式は次のとおりです。

arn:aws:sqs:us-east-1:1234567890:event-queue"

S3 バケットで通知を有効にする

  1. AWS コンソールで [S3] ページに移動します。

  2. [Buckets] リストで、ソースバケットを選択します。

  3. [Properties] タブをクリックします。

  4. [Event notifications] で [Create event notification] をクリックします。

  5. このイベントの名前を指定します。

  6. [Event types] で、[All object create events] を選択します。

  7. [Destination] で [SQS queue] を選択し、この転送用に作成したキューを選択します。

  8. [Save changes] をクリックします。

権限を構成する

ソースへのアクセスを構成する: Amazon S3 の手順で、アクセスキー ID と秘密鍵を作成するか、フェデレーション ID ロールを作成します。

カスタム権限の JSON を次のように置き換えます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::AWS_BUCKET_NAME",
                "arn:aws:s3:::AWS_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

作成したら、次の情報をメモします。

  • ユーザーの場合は、アクセスキー ID と秘密鍵をメモします。
  • フェデレーション ID ロールの場合は、arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME という形式の Amazon Resource Name(ARN)をメモします。

転送ジョブを作成する

REST API または Google Cloud コンソールを使用して、イベントベースの転送ジョブを作成できます。

Cloud コンソール

  1. Google Cloud コンソールで [転送ジョブを作成] ページに移動します。

    [転送ジョブを作成] に移動

  2. 転送元のタイプとして Amazon S3、転送先として Cloud Storage を選択します。

  3. [スケジュール モード] として [イベント ドリブン] を選択し、[次のステップ] をクリックします。

  4. S3 のバケット名を入力します。このバケット名は、AWS Management Console に表示される名前です。例: my-aws-bucket

  5. 認証方法を選択し、必要な情報を入力します。必要な情報は前のセクションで作成しました。

  6. 前の手順で作成した Amazon SQS のキューの ARN を入力します。これには次のフォーマットが使用されます。

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. 必要に応じてフィルタを定義し、[次のステップ] をクリックします。

  8. 転送先の Cloud Storage バケットとパス(省略可)を選択します。

  9. 必要に応じて、転送の開始時刻と終了時刻を入力します。時間を指定しない場合、転送はすぐに開始し、手動で停止するまで実行されます。

  10. 転送オプションを指定します。詳細については、転送の作成ページをご覧ください。

  11. [作成] をクリックします。

作成されると、転送ジョブの実行が開始し、イベント リスナーが SQS キューで通知を待機します。ジョブの詳細ページには、1 時間ごとに 1 つのオペレーションが表示されます。このページには、各ジョブで転送されたデータの詳細も表示されます。

REST

REST API を使用してイベント ドリブン転送を作成するには、次の JSON オブジェクトを transferJobs.create エンドポイントに送信します。

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime は省略可能です。開始時間を省略すると、転送がすぐに開始します。終了時間を省略すると、手動で停止するまで転送が続行されます。

クライアント ライブラリ

Go

Storage Transfer Service 用のクライアント ライブラリをインストールして使用する方法については、Storage Transfer Service のクライアント ライブラリをご覧ください。詳細については、Storage Transfer Service Go API のリファレンス ドキュメントをご覧ください。

Storage Transfer Service の認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Storage Transfer Service 用のクライアント ライブラリをインストールして使用する方法については、Storage Transfer Service のクライアント ライブラリをご覧ください。詳細については、Storage Transfer Service Java API のリファレンス ドキュメントをご覧ください。

Storage Transfer Service の認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Storage Transfer Service 用のクライアント ライブラリをインストールして使用する方法については、Storage Transfer Service のクライアント ライブラリをご覧ください。詳細については、Storage Transfer Service Node.js API のリファレンス ドキュメントをご覧ください。

Storage Transfer Service の認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Storage Transfer Service 用のクライアント ライブラリをインストールして使用する方法については、Storage Transfer Service のクライアント ライブラリをご覧ください。詳細については、Storage Transfer Service Python API のリファレンス ドキュメントをご覧ください。

Storage Transfer Service の認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


from google.cloud import storage_transfer

def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")