AWS S3의 이벤트 기반 전송

Storage Transfer Service는 AWS의 이벤트 알림을 리슨하여 소스 위치에 추가 또는 업데이트된 데이터를 Cloud Storage 버킷으로 자동으로 전송할 수 있습니다. 이벤트 기반 전송의 이점에 대해 자세히 알아보세요.

이벤트 기반 전송은 Amazon SQS로 전송된 Amazon S3 이벤트 알림을 리슨하여 소스 버킷의 객체가 수정되거나 추가된 시점을 확인합니다. 객체 삭제는 감지되지 않습니다. 소스에서 객체를 삭제해도 대상 버킷에 있는 연결된 객체는 삭제되지 않습니다.

SQS 큐 만들기

  1. AWS 콘솔에서 Simple Queue Service 페이지로 이동합니다.

  2. Create queue(큐 만들기)를 클릭합니다.

  3. 이 큐의 Name(이름)을 입력합니다.

  4. Access policy(액세스 정책) 섹션에서 Advanced(고급)를 선택합니다. 다음과 같이 JSON 객체가 표시됩니다.

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    AWSResource 값은 프로젝트마다 고유합니다.

  5. 표시된 JSON에서 특정 AWSResource 값을 다음 JSON 스니펫으로 복사합니다.

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    앞의 JSON에서 자리표시자 값은 다음 형식을 사용합니다.

    • AWS는 Amazon 웹 서비스 프로젝트를 나타내는 숫자 값입니다. 예를 들면 "aws:SourceAccount": "1234567890"입니다.
    • RESOURCE는 이 큐를 식별하는 Amazon Resource Number(ARN)입니다. 예를 들면 "Resource": "arn:aws:sqs:us-west-2:01234567890:test"입니다.
    • S3_BUCKET_ARN은 소스 버킷을 식별하는 ARN입니다. 예를 들면 "aws:SourceArn": "arn:aws:s3:::example-aws-bucket"입니다. AWS 콘솔의 버킷 세부정보 페이지에 있는 Properties(속성) 탭에서 버킷의 ARN을 찾을 수 있습니다.
  6. Access policy(액세스 정책) 섹션에 표시된 JSON을 위의 업데이트된 JSON으로 바꿉니다.

  7. Create queue(큐 만들기)를 클릭합니다.

완료되면 큐의 Amazon 리소스 이름(ARN)을 기록합니다. ARN의 형식은 다음과 같습니다.

arn:aws:sqs:us-east-1:1234567890:event-queue"

S3 버킷에서 알림 사용 설정

  1. AWS 콘솔에서 S3 페이지로 이동합니다.

  2. Buckets(버킷) 목록에서 소스 버킷을 선택합니다.

  3. 속성 탭을 선택합니다.

  4. Event notifications(이벤트 알림) 섹션에서 Create event notification(이벤트 알림 만들기)을 클릭합니다.

  5. 이 이벤트의 이름을 지정합니다.

  6. Event types(이벤트 유형) 섹션에서 All object create events(모든 객체 생성 이벤트)를 선택합니다.

  7. Destination(대상)에서 SQS queue(SQS 큐)를 선택하고 이 전송용으로 만든 큐를 선택합니다.

  8. 변경사항 저장을 클릭합니다.

권한 구성

소스에 대한 액세스 구성: Amazon S3의 안내에 따라 액세스 키 ID 및 보안 비밀 키 또는 제휴 ID 역할을 만듭니다.

커스텀 권한 JSON을 다음과 같이 대체합니다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

생성된 후에는 다음 정보를 기록하세요.

  • 사용자의 경우 액세스 키 ID 및 보안 비밀 키를 기록해 둡니다.
  • 제휴 ID 역할의 경우 arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME 형식의 Amazon 리소스 이름(ARN)을 기록합니다.

전송 작업 만들기

REST API 또는 Google Cloud 콘솔을 사용하여 이벤트 기반 전송 작업을 만들 수 있습니다.

Cloud 콘솔

  1. Google Cloud 콘솔에서 전송 작업 만들기 페이지로 이동합니다.

    전송 작업 만들기로 이동

  2. 소스 유형으로 Amazon S3를 선택하고 대상으로 Cloud Storage를 선택합니다.

  3. 예약 모드이벤트 기반을 선택하고 다음 단계를 클릭합니다.

  4. S3 버킷 이름을 입력합니다. 버킷 이름은 AWS 관리 콘솔에 나타나는 이름입니다. 예를 들면 my-aws-bucket입니다.

  5. 인증 방법을 선택하고 이전 섹션에서 만들고 기록한 요청받은 정보를 입력합니다.

  6. 앞에서 만든 Amazon SQS 큐 ARN을 입력합니다. 다음 형식을 사용합니다.

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. 원하는 경우 필터를 정의한 후 다음 단계를 클릭합니다.

  8. 대상 Cloud Storage 버킷과 경로(선택사항)를 선택합니다.

  9. 원하는 경우 전송 시작 시간과 종료 시간을 입력합니다. 시간을 지정하지 않으면 전송이 즉시 시작되고 수동으로 중지될 때까지 실행됩니다.

  10. 전송 옵션을 지정합니다. 자세한 내용은 전송 만들기 페이지에서 확인할 수 있습니다.

  11. 만들기를 클릭합니다.

생성된 후에는 전송 작업이 시작되고 이벤트 리스너가 SQS 큐에서 알림을 기다립니다. 작업 세부정보 페이지에는 시간당 하나의 작업이 표시되며 각 작업에 전송되는 데이터에 대한 세부정보가 포함됩니다.

REST

REST API를 사용하여 이벤트 기반 전송을 만들려면 다음 JSON 객체를 transferJobs.create 엔드포인트로 전송합니다.

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime은 선택사항입니다. 시작 시간을 생략하면 전송이 즉시 시작되고, 종료 시간을 생략하면 전송이 수동으로 중지될 때까지 계속됩니다.

클라이언트 라이브러리

Go

Storage Transfer Service용 클라이언트 라이브러리를 설치하고 사용하는 방법은 Storage Transfer Service 클라이언트 라이브러리를 참조하세요. 자세한 내용은 Storage Transfer Service Go API 참고 문서를 참조하세요.

Storage Transfer Service에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Storage Transfer Service용 클라이언트 라이브러리를 설치하고 사용하는 방법은 Storage Transfer Service 클라이언트 라이브러리를 참조하세요. 자세한 내용은 Storage Transfer Service Java API 참고 문서를 참조하세요.

Storage Transfer Service에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Storage Transfer Service용 클라이언트 라이브러리를 설치하고 사용하는 방법은 Storage Transfer Service 클라이언트 라이브러리를 참조하세요. 자세한 내용은 Storage Transfer Service Node.js API 참고 문서를 참조하세요.

Storage Transfer Service에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Storage Transfer Service용 클라이언트 라이브러리를 설치하고 사용하는 방법은 Storage Transfer Service 클라이언트 라이브러리를 참조하세요. 자세한 내용은 Storage Transfer Service Python API 참고 문서를 참조하세요.

Storage Transfer Service에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")