Ereignisgesteuerte Übertragungen

Storage Transfer Service kann auf Ereignisbenachrichtigungen in AWS oder Google Cloud warten, um Daten automatisch zu übertragen, die am Quellspeicherort hinzugefügt oder aktualisiert wurden. Ereignisgesteuerte Übertragungen von AWS S3 oder Cloud Storage zu Cloud Storage werden unterstützt.

Ereignisgesteuerte Übertragungen warten auf Amazon S3-Ereignisbenachrichtigungen, die an Amazon SQS für AWS S3-Quellen gesendet werden. Cloud Storage-Quellen senden Benachrichtigungen an ein Pub/Sub-Abo.

Vorteile ereignisgesteuerter Übertragungen

Da ereignisgesteuerte Übertragungen auf Änderungen am Quell-Bucket warten, werden Aktualisierungen nahezu in Echtzeit an das Ziel kopiert. Storage Transfer Service muss keinen Listenvorgang für die Quelle ausführen. Das spart Zeit und Geld.

Dies ist unter anderem in folgenden Fällen hilfreich:

  • Ereignisgesteuerte Analysen: Replizieren Sie Daten von AWS in Cloud Storage, um Analysen und Verarbeitung durchzuführen.

  • Cloud Storage-Replikation: Aktivieren Sie die automatische, asynchrone Objektreplikation zwischen Cloud Storage-Buckets.

    Ereignisgesteuerte Übertragungen mit Storage Transfer Service unterscheiden sich von der typischen Cloud Storage-Replikation dadurch, dass eine Kopie der Daten in einem anderen Bucket erstellt wird.

    Das bietet unter anderem folgende Vorteile:

    • Entwicklungs- und Produktionsdaten werden in separaten Namespaces aufbewahrt.
    • Daten freigeben, ohne Zugriff auf den ursprünglichen Bucket bereitzustellen.
    • Sicherung auf einen anderen Kontinent oder in einem Bereich, der nicht von der Speicherung in zwei oder mehr Regionen abgedeckt wird
  • DR/Hochverfügbarkeit einrichten: Replizieren Sie Objekte in der Reihenfolge von Minuten von der Quelle zum Sicherungsziel:

    • Cloudübergreifende Sicherung: Erstellen Sie eine Kopie der AWS S3-Sicherung in Cloud Storage.
    • Regions- oder projektübergreifende Sicherung: Erstellen Sie eine Kopie des Cloud Storage-Buckets in einer anderen Region oder einem anderen Projekt.
  • Live-Migration: Eine ereignisgesteuerte Übertragung kann eine Migration mit geringen Ausfallzeiten in der Reihenfolge von Ausfallzeiten als Folgeschritt zur einmaligen Batch-Migration durchführen.

Ereignisgesteuerte Übertragungen aus Cloud Storage einrichten

Ereignisgesteuerte Übertragungen aus Cloud Storage verwenden Pub/Sub-Benachrichtigungen, um zu erfahren, wenn Objekte im Quell-Bucket geändert oder hinzugefügt wurden. Objektlöschungen werden nicht erkannt. Wenn Sie ein Objekt an der Quelle löschen, wird das zugehörige Objekt nicht im Ziel-Bucket gelöscht.

Berechtigungen konfigurieren

  1. Suchen Sie den Namen des Storage Transfer Service-Dienst-Agents für Ihr Projekt:

    1. Rufen Sie die Referenzseite googleServiceAccounts.get auf.

      Es wird ein interaktives Steuerfeld mit dem Titel Diese Methode testen geöffnet.

    2. Geben Sie im Steuerfeld unter Anfrageparameter Ihre Projekt-ID ein. Das hier angegebene Projekt muss das Projekt sein, das Sie zur Verwaltung des Storage Transfer Service verwenden. Dieses kann sich vom Projekt des Quell-Buckets unterscheiden.

    3. Klicken Sie auf Execute.

    Die E-Mail-Adresse des Dienst-Agents wird als Wert von accountEmail zurückgegeben. Kopieren Sie diesen Wert.

    Die E-Mail des Dienst-Agents hat das Format project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Weisen Sie dem Storage Transfer Service-Dienst-Agent die Rolle Pub/Sub Subscriber zu.

    Cloud Console

    Folgen Sie der Anleitung unter Zugriff über die Google Cloud Console steuern, um dem Storage Transfer Service-Dienst die Rolle Pub/Sub Subscriber zuzuweisen. Die Rolle kann auf Themen-, Abo- oder Projektebene gewährt werden.

    gcloud CLI

    Folgen Sie der Anleitung unter Richtlinie festlegen, um die folgende Bindung hinzuzufügen:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }
    

Pub/Sub konfigurieren

  1. Prüfen Sie, ob Sie die Voraussetzungen für die Verwendung von Pub/Sub mit Cloud Storage erfüllen.

  2. Konfigurieren Sie Pub/Sub-Benachrichtigungen für Cloud Storage:

    gcloud storage buckets notifications create gs://BUCKET_NAME --topic=TOPIC_NAME
    
  3. Erstellen Sie ein Pull-Abo für das Thema:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300
    

Übertragungsjob erstellen

Sie können die REST API oder die Google Cloud Console verwenden, um einen ereignisbasierten Übertragungsjob zu erstellen.

Der Name des Übertragungsjobs darf keine vertraulichen Informationen wie personenidentifizierbare Informationen oder Sicherheitsdaten enthalten. Ressourcennamen können an die Namen anderer Google Cloud-Ressourcen weitergegeben und für Google-interne Systeme außerhalb Ihres Projekts bereitgestellt werden.

Cloud Console

  1. Rufen Sie in der Google Cloud Console die Seite Übertragungsjob erstellen auf.

    Gehen Sie zu Übertragungsjob erstellen

  2. Wählen Sie Cloud Storage sowohl als Quelle als auch als Ziel aus.

  3. Wählen Sie als Planungsmodus die Option Ereignisgesteuert aus und klicken Sie auf Nächster Schritt.

  4. Wählen Sie den Quell-Bucket für diese Übertragung aus.

  5. Geben Sie im Bereich Ereignisstream den Namen des Abos ein:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Legen Sie gegebenenfalls Filter fest und klicken Sie dann auf Nächster Schritt.

  7. Wählen Sie den Ziel-Bucket für diese Übertragung aus.

  8. Geben Sie optional eine Start- und Endzeit für die Übertragung ein. Wenn Sie keine Zeit angeben, wird die Übertragung sofort gestartet und so lange ausgeführt, bis sie manuell beendet wird.

  9. Geben Sie Übertragungsoptionen an. Weitere Informationen finden Sie auf der Seite Übertragungen erstellen.

  10. Klicken Sie auf Erstellen.

Nach der Erstellung wird der Übertragungsjob ausgeführt und ein Event-Listener wartet auf Benachrichtigungen für das Pub/Sub-Abo. Auf der Seite mit den Jobdetails wird pro Stunde ein Vorgang angezeigt, einschließlich Details zu den für jeden Job übertragenen Daten.

REST

Zum Erstellen einer ereignisgesteuerten Übertragung mit der REST API senden Sie das folgende JSON-Objekt an den Endpunkt transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime und eventStreamExpirationTime sind optional. Wenn die Startzeit weggelassen wird, beginnt die Übertragung sofort. Wird die Endzeit weggelassen, wird die Übertragung fortgesetzt, bis sie manuell beendet wird.

Clientbibliotheken

Go

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Go API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Java API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Node.js API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Python API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


from google.cloud import storage_transfer

def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Ereignisgesteuerte Übertragungen von AWS S3 einrichten

Ereignisgesteuerte Übertragungen von AWS S3 verwenden Benachrichtigungen von Amazon Simple Queue Service (SQS), um zu erfahren, wenn Objekte im Quell-Bucket geändert oder hinzugefügt wurden. Objektlöschungen werden nicht erkannt. Wenn Sie ein Objekt an der Quelle löschen, wird das zugehörige Objekt nicht im Ziel-Bucket gelöscht.

SQS-Warteschlange erstellen

  1. Rufen Sie in der AWS-Konsole die Seite Simple Queue Service (Einfacher Warteschlangendienst) auf.

  2. Klicken Sie auf Warteschlange erstellen.

  3. Geben Sie einen Namen für die Warteschlange ein.

  4. Wählen Sie im Bereich Zugriffsrichtlinie die Option Erweitert aus. Ein JSON-Objekt wird angezeigt:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    Die Werte AWS und Resource sind für jedes Projekt eindeutig.

  5. Kopieren Sie die spezifischen Werte von AWS und Resource aus der angezeigten JSON-Datei in das folgende JSON-Snippet:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }
    

    Die Werte der Platzhalter im vorherigen JSON-Format haben das folgende Format:

    • AWS ist ein numerischer Wert, der Ihr Amazon Web Services-Projekt darstellt. Beispiel: "aws:SourceAccount": "1234567890".
    • RESOURCE ist eine Amazon Resource Number (ARN), die diese Warteschlange identifiziert. Beispiel: "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
    • S3_BUCKET_ARN ist ein ARN, der den Quell-Bucket identifiziert. Beispiel: "aws:SourceArn": "arn:aws:s3:::example-aws-bucket" Sie finden den ARN eines Buckets in der AWS-Konsole auf der Seite mit den Bucket-Details auf dem Tab Eigenschaften.
  6. Ersetzen Sie die im Abschnitt Zugriffsrichtlinie angezeigte JSON-Datei durch die oben aktualisierte JSON-Datei.

  7. Klicken Sie auf Warteschlange erstellen.

Notieren Sie sich nach Abschluss des Vorgangs den Amazon Resource Name (ARN) der Warteschlange. Der ARN hat das folgende Format:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Benachrichtigungen im S3-Bucket aktivieren

  1. Rufen Sie in der AWS-Konsole die Seite S3 auf.

  2. Wählen Sie in der Liste Buckets den Quell-Bucket aus.

  3. Wählen Sie den Tab Eigenschaften aus.

  4. Klicken Sie im Abschnitt Ereignisbenachrichtigungen auf Terminbenachrichtigung erstellen.

  5. Geben Sie einen Namen für das Ereignis ein.

  6. Wählen Sie im Bereich Ereignistypen die Option Alle Erstellungsereignisse für Objekte aus.

  7. Wählen Sie als Ziel die Option SQS-Warteschlange und dann die Warteschlange aus, die Sie für diese Übertragung erstellt haben.

  8. Klicken Sie auf Änderungen speichern.

Berechtigungen konfigurieren

Folgen Sie der Anleitung unter Zugriff auf eine Quelle konfigurieren: Amazon S3, um entweder eine Zugriffsschlüssel-ID und einen geheimen Schlüssel oder eine föderierte Identität zu erstellen.

Ersetzen Sie die JSON-Datei für benutzerdefinierte Berechtigungen durch Folgendes:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::AWS_BUCKET_NAME",
                "arn:aws:s3:::AWS_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Notieren Sie nach der Erstellung die folgenden Informationen:

  • Notieren Sie sich für Nutzer die Zugriffsschlüssel-ID und den geheimen Schlüssel.
  • Notieren Sie sich für eine föderierte Identität den Amazon Resource Name (ARN) im Format arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME.

Übertragungsjob erstellen

Sie können die REST API oder die Google Cloud Console verwenden, um einen ereignisbasierten Übertragungsjob zu erstellen.

Cloud Console

  1. Rufen Sie in der Google Cloud Console die Seite Übertragungsjob erstellen auf.

    Gehen Sie zu Übertragungsjob erstellen

  2. Wählen Sie Amazon S3 als Quelltyp und Cloud Storage als Ziel aus.

  3. Wählen Sie als Planungsmodus die Option Ereignisgesteuert aus und klicken Sie auf Nächster Schritt.

  4. Geben Sie den Namen des S3-Buckets ein. Der Bucket-Name ist der Name, der in der AWS Management Console angezeigt wird. Beispiel: my-aws-bucket

  5. Wählen Sie Ihre Authentifizierungsmethode aus und geben Sie die angeforderten Informationen ein, die Sie im vorherigen Abschnitt erstellt und notiert haben.

  6. Geben Sie den zuvor erstellten ARN der Amazon SQS-Warteschlange ein. Sie verwendet das folgende Format:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Legen Sie gegebenenfalls Filter fest und klicken Sie dann auf Nächster Schritt.

  8. Wählen Sie den Cloud Storage-Ziel-Bucket und optional den Pfad aus.

  9. Geben Sie optional eine Start- und Endzeit für die Übertragung ein. Wenn Sie keine Zeit angeben, wird die Übertragung sofort gestartet und so lange ausgeführt, bis sie manuell beendet wird.

  10. Geben Sie Übertragungsoptionen an. Weitere Informationen finden Sie auf der Seite Übertragungen erstellen.

  11. Klicken Sie auf Erstellen.

Nach der Erstellung wird der Übertragungsjob ausgeführt und ein Event-Listener wartet auf Benachrichtigungen in der SQS-Warteschlange. Auf der Seite mit den Jobdetails wird pro Stunde ein Vorgang angezeigt, einschließlich Details zu den für jeden Job übertragenen Daten.

REST

Zum Erstellen einer ereignisgesteuerten Übertragung mit der REST API senden Sie das folgende JSON-Objekt an den Endpunkt transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime und eventStreamExpirationTime sind optional. Wenn die Startzeit weggelassen wird, beginnt die Übertragung sofort. Wird die Endzeit weggelassen, wird die Übertragung fortgesetzt, bis sie manuell beendet wird.

Clientbibliotheken

Go

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Go API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Java API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Node.js API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Python API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


from google.cloud import storage_transfer

def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")