Transferências orientadas a eventos do Cloud Storage

O Serviço de transferência do Cloud Storage pode detectar notificações de eventos no Google Cloud para transferir automaticamente dados que foram adicionados ou atualizados em um bucket do Cloud Storage. Saiba mais sobre os benefícios das transferências baseadas em eventos.

As transferências orientadas por eventos do Cloud Storage usam notificações do Pub/Sub para saber quando os objetos no bucket de origem foram modificados ou adicionados. As exclusões de objetos não são detectadas. Excluir um objeto na origem não exclui o objeto associado no bucket de destino.

As transferências baseadas em eventos sempre usam um bucket do Cloud Storage como destino.

Configurar permissões

  1. Encontre o nome do agente de serviço do Serviço de transferência do Cloud Storage para seu projeto:

    1. Acesse a página de referência de googleServiceAccounts.get.

      Você verá um painel interativo com o título Testar este método.

    2. No painel, em Parâmetros de solicitação, insira o ID do projeto. O projeto especificado aqui precisa ser o projeto que você está usando para gerenciar o Serviço de transferência do Cloud Storage, que pode ser diferente do projeto do bucket de origem.

    3. Clique em Execute.

    O e-mail do agente de serviço é retornado como o valor de accountEmail. Copie esse valor.

    O e-mail do agente de serviço tem o formato project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Conceda esse papel ao Pub/Sub Subscriber agente do Serviço de transferência do Cloud Storage.

    console do Cloud

    Siga as instruções em Como controlar o acesso usando o console do Google Cloud para conceder o papel Pub/Sub Subscriber ao serviço de serviço de transferência do Cloud Storage. O papel pode ser concedido no nível do tópico, da assinatura ou do projeto.

    CLI gcloud

    Siga as instruções em Como definir uma política para adicionar a seguinte vinculação:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Configurar o Pub/Sub

  1. Verifique se você atende aos Pré-requisitos para usar o Pub/Sub com o Cloud Storage.

  2. Configurar notificação do Pub/Sub para o Cloud Storage:

    gcloud storage buckets notifications create gs://BUCKET_NAME --topic=TOPIC_NAME
  3. Crie uma assinatura de pull para o tópico. É necessário criar uma assinatura separada para cada job de transferência.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Criar um job de transferência

Use a API REST ou o console do Google Cloud para criar um job de transferência baseado em eventos.

Não inclua informações sensíveis, como informações de identificação pessoal (PII, na sigla em inglês) ou dados de segurança no nome do job de transferência. Os nomes dos recursos podem ser propagados para os nomes de outros Google Cloud e podem ser expostos aos sistemas internos do Google fora do seu projeto.

console do Cloud

  1. Acesse a página Criar job de transferência no console do Google Cloud .

    Acesse Criar job de transferência.

  2. Selecione Cloud Storage como origem e destino.

  3. No Modo de programação, selecione Baseado em eventos e clique em Próxima etapa.

  4. Selecione o bucket de origem dessa transferência.

  5. Na seção Fluxo de eventos, insira o nome da assinatura:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Se quiser, defina qualquer filtro e clique em Próxima etapa.

  7. Selecione o bucket de destino dessa transferência.

  8. Também é possível digitar um horário de início e de término para a transferência. Se você não especificar um horário, a transferência vai ser iniciada imediatamente e executada até ser interrompida manualmente.

  9. Especifique as opções de transferência. Veja mais informações na página Criar transferências.

  10. Clique em Criar.

Depois de criado, o job de transferência começa a ser executado e um listener de eventos aguarda notificações na assinatura do Pub/Sub. A página de detalhes do job mostra uma operação a cada hora e contém detalhes sobre os dados transferidos em cada job.

REST

Para criar uma transferência baseada em eventos usando a API REST, envie o seguinte objeto JSON para o endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

O eventStreamStartTime e o eventStreamExpirationTime são opcionais. Se o horário de início for omitido, a transferência começará imediatamente. Se o horário de término for omitido, a transferência continuará até que seja interrompida manualmente.

Bibliotecas de cliente

Go

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Go do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Java do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Node.js do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Para saber como instalar e usar a biblioteca de cliente do Serviço de transferência do Cloud Storage, consulte Bibliotecas de clientes do Serviço de transferência do Cloud Storage. Para mais informações, consulte a Documentação de referência da API Python do Serviço de transferência do Cloud Storage.

Para autenticar o Serviço de transferência do Cloud Storage, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")