Transferencias basadas en eventos desde AWS S3

Storage Transfer Service puede monitorizar las notificaciones de eventos en AWS para transferir automáticamente los datos que se hayan añadido o actualizado en la ubicación de origen a un segmento de Cloud Storage. Más información sobre las ventajas de las transferencias basadas en eventos

Las transferencias basadas en eventos monitorizan las notificaciones de eventos de Amazon S3 que se envían a Amazon SQS para saber cuándo se han modificado o añadido objetos en el segmento de origen. No se detectan las eliminaciones de objetos. Si se elimina un objeto en el origen, no se elimina el objeto asociado en el segmento de destino.

Las transferencias basadas en eventos siempre usan un segmento de Cloud Storage como destino.

Antes de empezar

Sigue las instrucciones para conceder los permisos necesarios en el segmento de Cloud Storage de destino:

Crear una cola de SQS

  1. En la consola de AWS, ve a la página Simple Queue Service.

  2. Haz clic en Crear cola.

  3. Introduce un nombre para esta cola.

  4. En la sección Política de acceso, selecciona Avanzado. Se muestra un objeto JSON:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    Los valores de AWS y Resource son únicos para cada proyecto.

  5. Copia los valores específicos de AWS y Resource del JSON que se muestra en el siguiente fragmento de JSON:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    Los valores de los marcadores de posición del JSON anterior usan el siguiente formato:

    • AWS es un valor numérico que representa tu proyecto de Amazon Web Services. Por ejemplo, "aws:SourceAccount": "1234567890".
    • RESOURCE es un número de recurso de Amazon (ARN) que identifica esta cola. Por ejemplo, "Resource": "arn:aws:sqs:us-west-2:01234567890:test".
    • S3_BUCKET_ARN es un ARN que identifica el bucket de origen. Por ejemplo, "aws:SourceArn": "arn:aws:s3:::example-aws-bucket". Puedes encontrar el ARN de un segmento en la pestaña Propiedades de la página de detalles del segmento en la consola de AWS.
  6. Sustituye el JSON que se muestra en la sección Política de acceso por el JSON actualizado que aparece arriba.

  7. Haz clic en Crear cola.

Cuando se haya completado, anota el nombre de recurso de Amazon (ARN) de la cola. El ARN tiene el siguiente formato:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Habilita las notificaciones en tu contenedor de S3

  1. En la consola de AWS, ve a la página S3.

  2. En la lista Segmentos, selecciona el segmento de origen.

  3. Selecciona la pestaña Propiedades.

  4. En la sección Notificaciones de eventos, haz clic en Crear notificación de evento.

  5. Especifique un nombre para este evento.

  6. En la sección Tipos de evento, selecciona Todos los eventos de creación de objetos.

  7. En Destino, seleccione Cola de SQS y elija la cola que ha creado para esta transferencia.

  8. Haz clic en Guardar cambios.

Configurar permisos

Sigue las instrucciones de Configurar el acceso a una fuente: Amazon S3 para crear un ID de clave de acceso y una clave secreta, o un rol de identidad federada.

Sustituye el JSON de permisos personalizados por el siguiente:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Una vez creado, anota la siguiente información:

  • En el caso de un usuario, anota el ID de clave de acceso y la clave secreta.
  • En el caso de un rol de identidad federada, anota el nombre de recurso de Amazon (ARN), que tiene el formato arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME.

Crear una tarea de transferencia

Puedes usar la API REST o la Google Cloud consola para crear un trabajo de transferencia basado en eventos.

consola de Cloud

  1. Ve a la página Crear tarea de transferencia de la Google Cloud consola.

    Ve a Crear tarea de transferencia.

  2. Selecciona Amazon S3 como tipo de fuente y Cloud Storage como destino.

  3. En Modo de programación, selecciona Basado en eventos y haz clic en Siguiente paso.

  4. Introduce el nombre del segmento de S3. El nombre del contenedor es el nombre tal como aparece en la consola de administración de AWS. Por ejemplo, my-aws-bucket.

  5. Selecciona el método de autenticación e introduce la información solicitada, que has creado y anotado en la sección anterior.

  6. Introduce el ARN de la cola de Amazon SQS que has creado anteriormente. Utiliza el siguiente formato:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Si quieres, define los filtros que quieras y, a continuación, haz clic en Paso siguiente.

  8. Selecciona el segmento de Cloud Storage de destino y, opcionalmente, la ruta.

  9. Si quiere, puede introducir una hora de inicio y de finalización para la transferencia. Si no especificas una hora, la transferencia se iniciará inmediatamente y se ejecutará hasta que la detengas manualmente.

  10. Especifica las opciones de transferencia que quieras. Para obtener más información, consulta la página Crear transferencias.

  11. Haz clic en Crear.

Una vez creada, la tarea de transferencia empieza a ejecutarse y un receptor de eventos espera las notificaciones en la cola de SQS. La página de detalles del trabajo muestra una operación cada hora e incluye información sobre los datos transferidos de cada trabajo.

REST

Para crear una transferencia basada en eventos mediante la API REST, envía el siguiente objeto JSON al endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

Los elementos eventStreamStartTime y eventStreamExpirationTime son opcionales. Si se omite la hora de inicio, la transferencia comenzará inmediatamente. Si se omite la hora de finalización, la transferencia continuará hasta que se detenga manualmente.

Bibliotecas de cliente

Go

Para saber cómo instalar y usar la biblioteca de cliente de Servicio de transferencia de Storage, consulta Bibliotecas de cliente de Servicio de transferencia de Storage. Para obtener más información, consulta la documentación de referencia de la API Go del Servicio de transferencia de Storage.

Para autenticarte en el Servicio de transferencia de Storage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Para saber cómo instalar y usar la biblioteca de cliente de Servicio de transferencia de Storage, consulta Bibliotecas de cliente de Servicio de transferencia de Storage. Para obtener más información, consulta la documentación de referencia de la API Java del Servicio de transferencia de Storage.

Para autenticarte en el Servicio de transferencia de Storage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Para saber cómo instalar y usar la biblioteca de cliente de Servicio de transferencia de Storage, consulta Bibliotecas de cliente de Servicio de transferencia de Storage. Para obtener más información, consulta la documentación de referencia de la API Node.js del Servicio de transferencia de Storage.

Para autenticarte en el Servicio de transferencia de Storage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Para saber cómo instalar y usar la biblioteca de cliente de Servicio de transferencia de Storage, consulta Bibliotecas de cliente de Servicio de transferencia de Storage. Para obtener más información, consulta la documentación de referencia de la API Python del Servicio de transferencia de Storage.

Para autenticarte en el Servicio de transferencia de Storage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")