Trasferimenti basati su eventi da AWS S3

Storage Transfer Service può ascoltare le notifiche di eventi in AWS per trasferire automaticamente i dati aggiunti o aggiornati nella posizione di origine in un bucket Cloud Storage. Scopri di più sui vantaggi dei trasferimenti basati su eventi.

I trasferimenti basati su eventi ascoltano le notifiche di eventi di Amazon S3 inviate ad Amazon SQS per sapere quando gli oggetti nel bucket di origine sono stati modificati o aggiunti. Le eliminazioni di oggetti non vengono rilevate; l'eliminazione di un oggetto nell'origine non comporta l'eliminazione dell'oggetto associato nel bucket di destinazione.

Crea una coda SQS

  1. Nella console AWS, vai alla pagina Simple Queue Service.

  2. Fai clic su Crea coda.

  3. Inserisci un nome per la coda.

  4. Nella sezione Criteri di accesso, seleziona Avanzate. Viene visualizzato un oggetto JSON:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    I valori di AWS e Resource sono univoci per ogni progetto.

  5. Copia i valori specifici di AWS e Resource dal codice JSON visualizzato nel seguente snippet JSON:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    I valori dei segnaposto nel JSON precedente utilizzano il seguente formato:

    • AWS è un valore numerico che rappresenta il tuo progetto Amazon Web Services. Ad esempio, "aws:SourceAccount": "1234567890".
    • RESOURCE è un numero di risorsa Amazon (ARN) che identifica questa coda. Ad esempio, "Resource": "arn:aws:sqs:us-west-2:01234567890:test".
    • S3_BUCKET_ARN è un ARN che identifica il bucket di origine. Ad esempio, "aws:SourceArn": "arn:aws:s3:::example-aws-bucket". Puoi trovare l'ARN di un bucket dalla scheda Proprietà della pagina dei dettagli del bucket nella console AWS.
  6. Sostituisci il codice JSON visualizzato nella sezione Criterio di accesso con il codice JSON aggiornato sopra.

  7. Fai clic su Crea coda.

Al termine, prendi nota dell'Amazon Resource Name (ARN) della coda. L'ARN ha il seguente formato:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Attivare le notifiche nel bucket S3

  1. Nella console AWS, vai alla pagina S3.

  2. Nell'elenco Bucket, seleziona il bucket di origine.

  3. Seleziona la scheda Proprietà.

  4. Nella sezione Notifiche di eventi, fai clic su Crea notifica di evento.

  5. Specifica un nome per questo evento.

  6. Nella sezione Tipi di eventi, seleziona Tutti gli eventi di creazione di oggetti.

  7. Come Destinazione, seleziona Coda SQS e la coda che hai creato per questo trasferimento.

  8. Fai clic su Salva modifiche.

Configura autorizzazioni

Segui le istruzioni riportate in Configurare l'accesso a un'origine: Amazon S3 per creare un ID chiave di accesso e una chiave segreta o un ruolo di identità federata.

Sostituisci il file JSON delle autorizzazioni personalizzate con quanto segue:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Una volta creato, prendi nota delle seguenti informazioni:

  • Per un utente, prendi nota dell'ID chiave di accesso e della chiave segreta.
  • Per un ruolo Federated Identity, prendi nota dell'ARN (Amazon Resource Name), che ha il formato arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME.

Creare un job di trasferimento

Puoi utilizzare l'API REST o la console Google Cloud per creare un job di trasferimento basato su eventi.

console Cloud

  1. Vai alla pagina Crea job di trasferimento nella console Google Cloud.

    Vai a Crea job di trasferimento

  2. Seleziona Amazon S3 come tipo di origine e Cloud Storage come destinazione.

  3. Come Modalità di pianificazione, seleziona In base agli eventi e fai clic su Passaggio successivo.

  4. Inserisci il nome del bucket S3. Il nome del bucket è il nome visualizzato nella Console di gestione AWS. Ad esempio, my-aws-bucket.

  5. Seleziona il metodo di autenticazione e inserisci le informazioni richieste, che hai creato e annotato nella sezione precedente.

  6. Inserisci l'ARN della coda Amazon SQS che hai creato in precedenza. Utilizza il seguente formato:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Se vuoi, definisci i filtri, poi fai clic su Passaggio successivo.

  8. Seleziona il bucket Cloud Storage di destinazione e, facoltativamente, il percorso.

  9. Facoltativamente, inserisci un'ora di inizio e di fine per il trasferimento. Se non specifichi un orario, il trasferimento inizierà immediatamente e verrà eseguito finché non verrà interrotto manualmente.

  10. Specifica eventuali opzioni di trasferimento. Maggiori informazioni sono disponibili nella pagina Crea trasferimenti.

  11. Fai clic su Crea.

Una volta creato, il job di trasferimento inizia a essere eseguito e un gestore eventi attende le notifiche nella coda SQS. La pagina dei dettagli del job mostra un'operazione ogni ora e include i dettagli sui dati trasferiti per ogni job.

REST

Per creare un trasferimento basato su eventi utilizzando l'API REST, invia il seguente oggetto JSON all'endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime e eventStreamExpirationTime sono facoltativi. Se l'ora di inizio viene omessa, il trasferimento inizia immediatamente; se l'ora di fine viene omessa, il trasferimento continua finché non viene interrotto manualmente.

Librerie client

Go

Per scoprire come installare e utilizzare la libreria client per Storage Transfer Service, consulta Librerie client di Storage Transfer Service. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Go Storage Transfer Service.

Per autenticarti a Storage Transfer Service, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Per scoprire come installare e utilizzare la libreria client per Storage Transfer Service, consulta Librerie client di Storage Transfer Service. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Storage Transfer Service.

Per autenticarti a Storage Transfer Service, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Per scoprire come installare e utilizzare la libreria client per Storage Transfer Service, consulta Librerie client di Storage Transfer Service. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Storage Transfer Service.

Per autenticarti a Storage Transfer Service, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Per scoprire come installare e utilizzare la libreria client per Storage Transfer Service, consulta Librerie client di Storage Transfer Service. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Python Storage Transfer Service.

Per autenticarti a Storage Transfer Service, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")