Transfer berbasis peristiwa dari AWS S3

Storage Transfer Service dapat memproses notifikasi peristiwa di AWS untuk mentransfer data yang telah ditambahkan atau diperbarui di lokasi sumber secara otomatis ke bucket Cloud Storage. Pelajari lebih lanjut manfaat transfer berbasis peristiwa.

Transfer berbasis peristiwa memproses Notifikasi Peristiwa Amazon S3 yang dikirim ke Amazon SQS untuk mengetahui kapan objek di bucket sumber telah diubah atau ditambahkan. Penghapusan objek tidak terdeteksi; menghapus objek di sumber tidak akan menghapus objek terkait di bucket tujuan.

Transfer berbasis peristiwa selalu menggunakan bucket Cloud Storage sebagai tujuan.

Membuat antrean SQS

  1. Di konsol AWS, buka halaman Simple Queue Service.

  2. Klik Buat antrean.

  3. Masukkan Nama untuk antrean ini.

  4. Di bagian Kebijakan akses, pilih Lanjutan. Objek JSON akan ditampilkan:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    Nilai AWS dan Resource bersifat unik untuk setiap project.

  5. Salin nilai AWS dan Resource tertentu dari JSON yang ditampilkan ke dalam cuplikan JSON berikut:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    Nilai placeholder dalam JSON sebelumnya menggunakan format berikut:

    • AWS adalah nilai numerik yang mewakili project Amazon Web Services Anda. Contoh, "aws:SourceAccount": "1234567890".
    • RESOURCE adalah Amazon Resource Number (ARN) yang mengidentifikasi antrean ini. Contoh, "Resource": "arn:aws:sqs:us-west-2:01234567890:test".
    • S3_BUCKET_ARN adalah ARN yang mengidentifikasi bucket sumber. Contoh, "aws:SourceArn": "arn:aws:s3:::example-aws-bucket". Anda dapat menemukan ARN bucket dari tab Properties di halaman detail bucket di konsol AWS.
  6. Ganti JSON yang ditampilkan di bagian Kebijakan akses dengan JSON yang diperbarui di atas.

  7. Klik Buat antrean.

Setelah selesai, catat Amazon Resource Name (ARN) antrean. ARN memiliki format berikut:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Mengaktifkan notifikasi di bucket S3

  1. Di konsol AWS, buka halaman S3.

  2. Di daftar Buckets, pilih bucket sumber Anda.

  3. Pilih tab Properties.

  4. Di bagian Notifikasi acara, klik Buat notifikasi acara.

  5. Tentukan nama untuk peristiwa ini.

  6. Di bagian Jenis peristiwa, pilih Semua peristiwa pembuatan objek.

  7. Sebagai Tujuan, pilih Antrean SQS, lalu pilih antrean yang Anda buat untuk transfer ini.

  8. Klik Simpan perubahan.

Konfigurasikan izin

Ikuti petunjuk di bagian Mengonfigurasi akses ke sumber: Amazon S3 untuk membuat ID kunci akses dan kunci rahasia, atau peran Federated Identity.

Ganti JSON izin kustom dengan yang berikut:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Setelah dibuat, catat informasi berikut:

  • Untuk pengguna, catat ID kunci akses dan kunci rahasia.
  • Untuk peran Federated Identity, catat Amazon Resource Name (ARN), yang memiliki format arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME.

Membuat tugas transfer

Anda dapat menggunakan REST API atau konsol Google Cloud untuk membuat tugas transfer berbasis peristiwa.

Cloud Console

  1. Buka halaman Create transfer job di konsol Google Cloud .

    Buka Buat tugas transfer

  2. Pilih Amazon S3 sebagai jenis sumber, dan Cloud Storage sebagai tujuan.

  3. Sebagai Mode penjadwalan, pilih Event-driven, lalu klik Langkah berikutnya.

  4. Masukkan nama bucket S3 Anda. Nama bucket adalah nama seperti yang muncul di AWS Management Console. Contoh, my-aws-bucket.

  5. Pilih metode autentikasi dan masukkan informasi yang diminta, yang Anda buat dan catat di bagian sebelumnya.

  6. Masukkan ARN antrean Amazon SQS yang Anda buat sebelumnya. Kode ini menggunakan format berikut:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Secara opsional, tentukan filter, lalu klik Langkah berikutnya.

  8. Pilih bucket Cloud Storage tujuan dan, secara opsional, jalur.

  9. Secara opsional, masukkan waktu mulai dan berakhir untuk transfer. Jika Anda tidak menentukan waktu, transfer akan segera dimulai dan akan berjalan hingga dihentikan secara manual.

  10. Tentukan opsi transfer. Informasi selengkapnya tersedia di halaman Buat transfer.

  11. Klik Create.

Setelah dibuat, tugas transfer akan mulai berjalan dan pemroses peristiwa menunggu notifikasi di antrean SQS. Halaman detail tugas menampilkan satu operasi setiap jam, dan menyertakan detail tentang data yang ditransfer untuk setiap tugas.

REST

Untuk membuat transfer berbasis peristiwa menggunakan REST API, kirim objek JSON berikut ke endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime dan eventStreamExpirationTime bersifat opsional. Jika waktu mulai dihilangkan, transfer akan langsung dimulai; jika waktu akhir dihilangkan, transfer akan berlanjut hingga dihentikan secara manual.

Library klien

Go

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Storage Transfer Service Go API.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Storage Transfer Service Java API.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Storage Transfer Service Node.js API.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Storage Transfer Service Python API.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")