Transferencias basadas en eventos desde Cloud Storage

Storage Transfer Service puede monitorizar las notificaciones de eventos en Google Cloudpara transferir automáticamente los datos que se hayan añadido o actualizado en un segmento de Cloud Storage. Más información sobre las ventajas de las transferencias basadas en eventos

Las transferencias basadas en eventos de Cloud Storage usan notificaciones de Pub/Sub para saber cuándo se han modificado o añadido objetos en el segmento de origen. No se detectan las eliminaciones de objetos. Si se elimina un objeto en el origen, no se elimina el objeto asociado en el segmento de destino.

Las transferencias basadas en eventos siempre usan un segmento de Cloud Storage como destino.

Configurar permisos

Además de los permisos necesarios para todos los trabajos de transferencia, las transferencias basadas en eventos requieren el rol Pub/Sub Subscriber.

  1. Busca el nombre del agente de servicio del Servicio de transferencia de Storage de tu proyecto:

    1. Ve a la página de referencia de googleServiceAccounts.get.

      Se abrirá un panel interactivo titulado Prueba este método.

    2. En el panel, en Parámetros de solicitud, introduce tu ID de proyecto. El proyecto que especifiques aquí debe ser el que utilices para gestionar el Servicio de transferencia de Storage, que puede ser diferente del proyecto del segmento de origen.

    3. Haz clic en la opción para ejecutar.

    La dirección de correo de tu agente de asistencia se devuelve como el valor de accountEmail. Copia este valor.

    El correo del agente de servicio utiliza el formato project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Asigna el rol Pub/Sub Subscriber al agente de servicio del Servicio de transferencia de Storage.

    consola de Cloud

    Sigue las instrucciones de Controlar el acceso a través de la consola Google Cloud para conceder el rol Pub/Sub Subscriber al servicio Storage Transfer Service. El rol se puede conceder a nivel de tema, suscripción o proyecto.

    gcloud CLI

    Sigue las instrucciones de Definir una política para añadir la siguiente vinculación:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Configurar Pub/Sub

  1. Asegúrate de que cumples los requisitos previos para usar Pub/Sub con Cloud Storage.

  2. Crea una notificación de Pub/Sub para el segmento de Cloud Storage de origen.

    No puedes gestionar las notificaciones de Pub/Sub con la Google Cloud consola. Usa gcloud CLI o una de las bibliotecas de cliente disponibles.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. Crea una suscripción de extracción para el tema. Debe crear una suscripción independiente para cada trabajo de transferencia.

    En el siguiente ejemplo se muestra el comando de Google Cloud CLI para crear una suscripción de extracción. Para obtener instrucciones sobre la consola y el código de la biblioteca de cliente, consulta Crear una suscripción pull.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Crear una tarea de transferencia

Puedes usar la API REST o la Google Cloud consola para crear un trabajo de transferencia basado en eventos.

No incluyas información sensible, como información personal identificable (IPI) o datos de seguridad, en el nombre del trabajo de transferencia. Los nombres de recursos pueden propagarse a los nombres de otros recursos de Google Cloud y pueden exponerse a sistemas internos de Google fuera de tu proyecto.

consola de Cloud

  1. Ve a la página Crear tarea de transferencia de la Google Cloud consola.

    Ve a Crear tarea de transferencia.

  2. Selecciona Cloud Storage como origen y destino.

  3. En Modo de programación, selecciona Basado en eventos y haz clic en Siguiente paso.

  4. Seleccione el segmento de origen de esta transferencia.

  5. En la sección Secuencia de eventos, introduzca el nombre de la suscripción:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Si quieres, define los filtros que quieras y, a continuación, haz clic en Paso siguiente.

  7. Seleccione el contenedor de destino de esta transferencia.

  8. Si quiere, puede introducir una hora de inicio y de finalización para la transferencia. Si no especificas una hora, la transferencia se iniciará inmediatamente y se ejecutará hasta que se detenga manualmente.

  9. Especifica las opciones de transferencia que quieras. Para obtener más información, consulta la página Crear transferencias.

  10. Haz clic en Crear.

Una vez creado, el trabajo de transferencia empieza a ejecutarse y un listener de eventos espera las notificaciones de la suscripción de Pub/Sub. En la página de detalles del trabajo se muestra una operación por hora y se incluyen detalles sobre los datos transferidos de cada trabajo.

REST

Para crear una transferencia basada en eventos mediante la API REST, envía el siguiente objeto JSON al endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

Los elementos eventStreamStartTime y eventStreamExpirationTime son opcionales. Si se omite la hora de inicio, la transferencia comenzará inmediatamente. Si se omite la hora de finalización, la transferencia continuará hasta que se detenga manualmente.

Bibliotecas de cliente

Go

Para saber cómo instalar y usar la biblioteca de cliente de Servicio de transferencia de Storage, consulta Bibliotecas de cliente de Servicio de transferencia de Storage. Para obtener más información, consulta la documentación de referencia de la API Go del Servicio de transferencia de Storage.

Para autenticarte en el Servicio de transferencia de Storage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Para saber cómo instalar y usar la biblioteca de cliente de Servicio de transferencia de Storage, consulta Bibliotecas de cliente de Servicio de transferencia de Storage. Para obtener más información, consulta la documentación de referencia de la API Java del Servicio de transferencia de Storage.

Para autenticarte en el Servicio de transferencia de Storage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Para saber cómo instalar y usar la biblioteca de cliente de Servicio de transferencia de Storage, consulta Bibliotecas de cliente de Servicio de transferencia de Storage. Para obtener más información, consulta la documentación de referencia de la API Node.js del Servicio de transferencia de Storage.

Para autenticarte en el Servicio de transferencia de Storage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Para saber cómo instalar y usar la biblioteca de cliente de Servicio de transferencia de Storage, consulta Bibliotecas de cliente de Servicio de transferencia de Storage. Para obtener más información, consulta la documentación de referencia de la API Python del Servicio de transferencia de Storage.

Para autenticarte en el Servicio de transferencia de Storage, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Monitorizar una transferencia basada en eventos

Cuando crea una transferencia basada en eventos, el Servicio de transferencia de Storage crea un trabajo de transferencia. Cuando se alcanza la hora de inicio, se empieza a ejecutar una operación de transferencia y un listener de eventos espera las notificaciones de la cola de Pub/Sub.

La operación de transferencia se ejecuta con el estado in progress durante aproximadamente 24 horas. Transcurridas 24 horas, la operación finaliza y comienza una nueva. Se crea una operación nueva cada 24 horas hasta que se alcanza la hora de finalización de la tarea de transferencia o hasta que se detiene manualmente.

Si se está transfiriendo un archivo cuando se programa la finalización de la operación, esta seguirá en curso hasta que el archivo se haya transferido por completo. Se inicia una nueva operación y las dos se ejecutan simultáneamente hasta que finaliza la antigua. La nueva operación gestionará los eventos que se detecten durante este periodo.

Para ver la operación actual y las operaciones completadas, sigue estos pasos:

Google Cloud consola

  1. Ve a la página Storage Transfer Service de la Google Cloud consola.

    Ir al Servicio de transferencia de Storage

  2. En la lista de tareas, selecciona la pestaña Todas o De nube a nube.

  3. Haz clic en el ID de la tarea de transferencia. La columna Modo de programación identifica todas las transferencias basadas en eventos frente a las transferencias por lotes.

  4. Selecciona la pestaña Operaciones. Se muestran los detalles de la operación actual y las operaciones completadas se indican en la tabla Historial de ejecuciones. Haz clic en cualquier operación completada para ver más detalles.

gcloud

Para monitorizar el progreso de un trabajo en tiempo real, usa gcloud transfer jobs monitor. La respuesta muestra la operación actual, la hora de inicio del trabajo, la cantidad de datos transferidos, los bytes omitidos y el número de errores.

gcloud transfer jobs monitor JOB_NAME

Para obtener el nombre de la operación actual, haz lo siguiente:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

Para enumerar las operaciones actuales y completadas, haz lo siguiente:

gcloud transfer operations list --job-names=JOB_NAME

Para ver los detalles de una operación, sigue estos pasos:

gcloud transfer operations describe OPERATION_NAME