Transferências baseadas em eventos do AWS S3

O Serviço de transferência do Cloud Storage pode detectar notificações de eventos na AWS para transferir automaticamente dados que foram adicionados ou atualizados no local de origem para um bucket do Cloud Storage. Saiba mais sobre os benefícios das transferências baseadas em eventos.

As transferências orientadas por eventos detectam notificações de eventos do Amazon S3 enviadas ao Amazon SQS para saber quando objetos no bucket de origem foram modificados ou adicionados. As exclusões de objetos não são detectadas. Excluir um objeto na origem não exclui o objeto associado no bucket de destino.

As transferências baseadas em eventos sempre usam um bucket do Cloud Storage como destino.

Criar uma fila do SQS

  1. No console da AWS, acesse a página Simple Queue Service.

  2. Clique em Criar fila.

  3. Insira um Nome para essa fila.

  4. Na seção Política de acesso, selecione Avançado. Um objeto JSON é exibido:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    Os valores de AWS e Resource são exclusivos para cada projeto.

  5. Copie os valores específicos de AWS e Resource do JSON exibido para o seguinte snippet JSON:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    Os valores dos marcadores de posição no JSON anterior usam este formato:

    • AWS é um valor numérico que representa o projeto do Amazon Web Services. Por exemplo, "aws:SourceAccount": "1234567890".
    • RESOURCE é um Número de Recurso da Amazon (ARN) que identifica essa fila. Por exemplo, "Resource": "arn:aws:sqs:us-west-2:01234567890:test".
    • S3_BUCKET_ARN é um ARN que identifica o bucket de origem. Por exemplo, "aws:SourceArn": "arn:aws:s3:::example-aws-bucket". É possível encontrar o ARN de um bucket na guia Propriedades da página de detalhes do bucket no console da AWS.
  6. Substitua o JSON exibido na seção Política de acesso pelo JSON atualizado acima.

  7. Clique em Criar fila.

Quando for concluído, anote o nome de recurso da Amazon (ARN) na fila. O ARN tem o seguinte formato:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Ativar notificações no bucket S3

  1. No console da AWS, acesse a página S3.

  2. Na lista Buckets, selecione o bucket de origem.

  3. Selecione a guia Propriedades.

  4. Na seção Notificações de eventos, clique em Criar notificações de eventos.

  5. Especifique um nome para esse evento.

  6. Na seção Tipos de evento, selecione Todos os eventos de criação de objeto.

  7. Em Destino, selecione Fila SQS e selecione a fila criada para essa transferência.

  8. Clique em Salvar alterações.

Configurar permissões

Siga as instruções em Configurar o acesso a uma origem: Amazon S3 para criar um ID de chave de acesso e uma chave secreta, ou um papel de identidade federada.

Substitua o JSON de permissões personalizadas pelo seguinte:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Após a criação, observe as seguintes informações:

  • Para um usuário, anote o ID da chave de acesso e a chave secreta.
  • Para um papel de identidade federada, anote o Amazon Resource Name (ARN), que tem o formato arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME.

Criar um job de transferência

Use a API REST ou o console do Google Cloud para criar um job de transferência baseado em eventos.

console do Cloud

  1. Acesse a página Criar job de transferência no console do Google Cloud .

    Acesse Criar job de transferência.

  2. Selecione Amazon S3 como o tipo de origem e Cloud Storage como destino.

  3. No Modo de programação, selecione Baseado em eventos e clique em Próxima etapa.

  4. Insira o nome do bucket S3. O nome do bucket é o nome exibido no AWS Management Console. Por exemplo, my-aws-bucket.

  5. Selecione seu método de autenticação e insira as informações solicitadas, que você criou e anotou na seção anterior.

  6. Digite o ARN da fila do Amazon SQS que você criou anteriormente. Ele usa o seguinte formato:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Se quiser, defina qualquer filtro e clique em Próxima etapa.

  8. Selecione o bucket de destino do Cloud Storage e, opcionalmente, o caminho.

  9. Também é possível digitar um horário de início e de término para a transferência. Se você não especificar um horário, a transferência será iniciada imediatamente e será executada até ser interrompida manualmente.

  10. Especifique as opções de transferência. Veja mais informações na página Criar transferências.

  11. Clique em Criar.

Depois de criado, o job de transferência começa a ser executado e um listener de eventos aguarda notificações na fila do SQS. A página de detalhes do job mostra uma operação a cada hora e inclui detalhes sobre os dados transferidos para cada job.

REST

Para criar uma transferência baseada em eventos usando a API REST, envie o seguinte objeto JSON para o endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

O eventStreamStartTime e o eventStreamExpirationTime são opcionais. Se o horário de início for omitido, a transferência começará imediatamente. Se o horário de término for omitido, a transferência continuará até que seja interrompida manualmente.

Bibliotecas de cliente

Go

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Go do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Java do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Node.js do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Python do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")