Criar um tópico de importação do Cloud Storage

Um tópico de importação do Cloud Storage permite ingerir dados do Cloud Storage no Pub/Sub de forma contínua. Em seguida, você pode transmitir os dados para qualquer um dos destinos compatíveis com o Pub/Sub. O Pub/Sub detecta e ingere automaticamente novos objetos adicionados ao bucket do Cloud Storage.

O Cloud Storage é um serviço para o armazenamento de objetos no Google Cloud. Um objeto é um dado imutável composto de um arquivo em qualquer formato. Os objetos são armazenados em contêineres chamados de buckets. Os buckets também podem conter pastas gerenciadas, usadas para fornecer acesso expandido a grupos de objetos com um prefixo de nome compartilhado.

Para mais informações sobre o Cloud Storage, consulte a documentação do Cloud Storage.

Para mais informações sobre os tópicos de importação, consulte Sobre os tópicos de importação.

Antes de começar

Papéis e permissões necessários para gerenciar tópicos de importação do Cloud Storage

Para receber as permissões necessárias para criar e gerenciar um tópico de importação do Cloud Storage, peça ao administrador para conceder a você o Papel do IAM de editor do Pub/Sub (roles/pubsub.editor) no seu tópico ou projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar e gerenciar um tópico de importação do Cloud Storage. Para conferir as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar e gerenciar um tópico de importação do Cloud Storage:

  • Crie um tópico de importação: pubsub.topics.create
  • Exclua um tema de importação: pubsub.topics.delete
  • Receba um tema de importação: pubsub.topics.get
  • Listar um tema de importação: pubsub.topics.list
  • Publicar em um tópico de importação: pubsub.topics.publish
  • Atualize um tema de importação: pubsub.topics.update
  • Acesse a política do IAM para um tópico de importação: pubsub.topics.getIamPolicy
  • Configure a política do IAM para um tópico de importação: pubsub.topics.setIamPolicy

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

É possível configurar o controle de acesso no nível do projeto e do recurso individual.

A política de armazenamento de mensagens é compatível com o local do bucket

A política de armazenamento de mensagens do tópico do Pub/Sub precisa se sobrepor às regiões em que o bucket do Cloud Storage está localizado. Essa política determina onde o Pub/Sub pode armazenar os dados da sua mensagem.

  • Para buckets com o tipo de localização como região: a política precisa incluir essa região específica. Por exemplo, se o bucket estiver na região us-central1, a política de armazenamento de mensagens também precisará incluir us-central1.

  • Para buckets com o tipo de local como birregional ou multirregional: a política precisa incluir pelo menos uma região no local birregional ou multirregional. Por exemplo, se o bucket estiver na US multi-region, a política de armazenamento de mensagens poderá incluir us-central1, us-east1 ou qualquer outra região dentro da US multi-region.

    Se a política não incluir a região do bucket, a criação do tópico vai falhar. Por exemplo, se o bucket estiver em europe-west1 e a política de armazenamento de mensagens incluir apenas asia-east1, você vai receber um erro.

    Se a política de armazenamento de mensagens incluir apenas uma região que se sobrepõe ao local do bucket, a redundância de várias regiões poderá ser comprometida. Isso ocorre porque, se essa única região ficar indisponível, seus dados podem não estar acessíveis. Para garantir a redundância total, é recomendável incluir pelo menos duas regiões na política de armazenamento de mensagens que façam parte do local multirregional ou birregional do bucket.

Para mais informações sobre os locais de bucket, consulte a documentação.

Adicionar a função de editor do Pub/Sub à conta de serviço do Pub/Sub

É necessário atribuir o papel de editor do Pub/Sub à conta de serviço do Pub/Sub para que ele possa publicar no tópico de importação do Cloud Storage.

Ativar a publicação em todos os tópicos de importação do Cloud Storage

Escolha essa opção quando não houver um tópico de importação do Cloud Storage disponível no projeto.

  1. No console do Google Cloud, abra a página IAM.

    Acessar IAM

  2. Ative a opção Incluir concessões de papel fornecidas pelo Google.

  3. Procure a conta de serviço do Pub/Sub com o formato:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. Para essa conta de serviço, clique no botão Editar principal.

  5. Se necessário, clique em Adicionar outro papel.

  6. Pesquise e selecione a função de editor do Pub/Sub (roles/pubsub.publisher).

  7. Clique em Salvar.

Ativar a publicação em um único tópico de importação do Cloud Storage

Se você quiser conceder ao Pub/Sub a permissão para publicar em um tópico de importação do Cloud Storage específico que já existe, siga estas etapas:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Substitua:

    • TOPIC_ID é o ID ou nome do tópico de importação do Cloud Storage.

    • PROJECT_NUMBER é o número do projeto. Para conferir o número do projeto, consulte Como identificar projetos.

Atribuir papéis do Cloud Storage à conta de serviço do Pub/Sub

Para criar um tópico de importação do Cloud Storage, a conta de serviço do Pub/Sub precisa ter permissão para ler o bucket específico do Cloud Storage. As seguintes permissões são necessárias:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

Para atribuir essas permissões à conta de serviço do Pub/Sub, escolha um dos seguintes procedimentos:

  • Conceda permissões no nível do bucket. No bucket específico do Cloud Storage, conceda os papéis de Leitor de objetos legado do Storage (roles/storage.legacyObjectReader) e Leitor de bucket legado do Storage (roles/storage.legacyBucketReader) à conta de serviço do Pub/Sub.

  • Se você precisar conceder papéis no nível do projeto, conceda o papel de administrador do Storage (roles/storage.admin) no projeto que contém o bucket do Cloud Storage. Conceda esse papel à conta de serviço do Pub/Sub.

Permissões de bucket

Siga as etapas abaixo para conceder os papéis de leitor de objetos legados do Storage (roles/storage.legacyObjectReader) e de leitor de bucket legado do Storage (roles/storage.legacyBucketReader) à conta de serviço do Pub/Sub no nível do bucket:

  1. No Console do Google Cloud, acesse a página Cloud Storage.

    Acesse o Cloud Storage

  2. Clique no bucket do Cloud Storage de onde você quer ler mensagens e importar para o tópico de importação do Cloud Storage.

    A página Detalhes do bucket é aberta.

  3. Na página Detalhes do bucket, clique na guia Permissões.

  4. Na guia Permissões > Visualizar por principais, clique em Conceder acesso.

    A página Conceder acesso é aberta.

  5. Na seção Adicionar principais, insira o nome da sua conta de serviço do Pub/Sub.

    O formato da conta de serviço é service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Por exemplo, para um projeto com PROJECT_NUMBER=112233445566, a conta de serviço tem o formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  6. No menu suspenso Atribuir papéis > Selecionar um papel, digite Object Reader e selecione a função Leitor de objetos legados do Storage.

  7. Clique em Adicionar outro papel.

  8. No menu suspenso Selecionar um papel, digite Bucket Reader e selecione o papel Leitor de bucket legados do Storage.

  9. Clique em Salvar.

Permissões do projeto

Siga estas etapas para conceder o papel de administrador do Storage (roles/storage.admin) no nível do projeto:

  1. No console do Google Cloud, abra a página IAM.

    Acessar IAM

  2. Na guia Permissões > Visualizar por principais, clique em Conceder acesso.

    A página Conceder acesso é aberta.

  3. Na seção Adicionar principais, insira o nome da sua conta de serviço do Pub/Sub.

    O formato da conta de serviço é service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Por exemplo, para um projeto com PROJECT_NUMBER=112233445566, a conta de serviço tem o formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. No menu suspenso Atribuir papéis > Selecionar um papel, digite Storage Admin e selecione o papel Administrador do Storage.

  5. Clique em Salvar.

Para mais informações sobre o IAM do Cloud Storage, consulte Identity and Access Management do Cloud Storage.

Propriedades dos tópicos de importação do Cloud Storage

Para mais informações sobre as propriedades comuns em todos os tópicos, consulte Propriedades de um tópico.

Nome do bucket

É o nome do bucket do Cloud Storage de onde o Pub/Sub lê os dados publicados em um tópico de importação do Cloud Storage.

Formato da entrada

Ao criar um tópico de importação do Cloud Storage, é possível especificar o formato dos objetos a serem ingeridos como Texto, Avro ou Pub/Sub Avro.

  • Texto. Os objetos são considerados como detentores de dados com texto simples. Esse formato de entrada tenta ingerir todos os objetos no bucket, desde que o objeto atenda ao tempo mínimo de criação de objeto e corresponda aos critérios de padrão glob.

    Delimitador. Também é possível especificar um delimitador pelo qual os objetos são divididos em mensagens. Se não for definido, o padrão será o caractere de nova linha (\n). O delimitador precisa ser apenas um caractere.

  • Avro. Os objetos estão no formato binário do Apache Avro. Qualquer objeto que não esteja em um formato válido do Apache Avro não é ingerido. Estas são as limitações do Avro:

    • As versões 1.1.0 e 1.2.0 do Avro não são compatíveis.
    • O tamanho máximo de um bloco Avro é de 16 MB.
  • Pub/Sub Avro. Os objetos estão no formato binário do Apache Avro com um esquema correspondente ao de um objeto gravado no Cloud Storage usando uma assinatura do Cloud Storage do Pub/Sub com o formato de arquivo Avro. Confira algumas diretrizes importantes para o Avro do Pub/Sub:

    • O campo de dados do registro Avro é usado para preencher o campo de dados da mensagem do Pub/Sub gerada.

    • Se a opção write_metadata for especificada para a assinatura do Cloud Storage, todos os valores no campo de atributos serão preenchidos como os atributos da mensagem do Pub/Sub gerada.

    • Se uma chave de ordenação for especificada na mensagem original gravada no Cloud Storage, esse campo será preenchido como um atributo com o nome original_message_ordering_key na mensagem gerada do Pub/Sub.

Hora mínima de criação de objeto

Você pode especificar um tempo mínimo de criação de objeto ao criar um tópico de importação do Cloud Storage. Somente os objetos criados a partir desse carimbo de data/hora são ingeridos. Esse carimbo de data/hora precisa ser fornecido em um formato como YYYY-MM-DDThh:mm:ssZ. Qualquer data, passada ou futura, de 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z, inclusive, é válida.

Corresponder ao padrão glob

Você pode especificar um padrão de correspondência ao criar um tópico de importação do Cloud Storage. Somente objetos com nomes que correspondem a esse padrão são ingeridos. Por exemplo, para ingerir todos os objetos com o sufixo .txt, especifique o padrão glob como **.txt.

Para informações sobre a sintaxe com suporte para padrões glob, consulte a documentação do Cloud Storage.

Criar um tópico de importação do Cloud Storage

Verifique se você concluiu os seguintes procedimentos:

Criar o tópico e a assinatura separadamente, mesmo que em rápida sucessão, pode levar à perda de dados. Há um curto período em que o tópico existe sem uma assinatura. Se algum dado for enviado para o tópico durante esse período, ele será perdido. Ao criar o tópico primeiro, criar a assinatura e, em seguida, converter o tópico em um tópico de importação, você garante que nenhuma mensagem seja perdida durante o processo de importação.

Para criar um tópico de importação do Cloud Storage, siga estas etapas:

Console

  1. No console do Google Cloud, acesse a página Tópicos.

    Acesse Tópicos

  2. Selecione Criar tópico.

    A página de detalhes do tópico é aberta.

  3. No campo ID do tópico, insira um ID para o tópico de importação do Cloud Storage.

    Para mais informações sobre como nomear tópicos, consulte as diretrizes de nomenclatura.

  4. Selecione Adicionar uma assinatura padrão.

  5. Selecione Ativar ingestão.

  6. Em "Origem de transferência", selecione Google Cloud Storage.

  7. No bucket do Cloud Storage, clique em Procurar.

    A página Selecionar bucket é aberta. Selecione uma das seguintes opções:

    • Selecione um bucket de qualquer projeto apropriado.

    • Clique no ícone de criação e siga as instruções na tela para criar um novo bucket. Depois de criar o bucket, selecione-o para o tópico de importação do Cloud Storage.

  8. Quando você especifica o bucket, o Pub/Sub verifica as permissões adequadas no bucket para a conta de serviço do Pub/Sub. Se houver problemas de permissão, uma mensagem semelhante a esta vai aparecer:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    Se você tiver problemas de permissão, clique em Definir permissões. Para mais informações, consulte Conceder permissões do Cloud Storage à conta de serviço do Pub/Sub.

  9. Em Formato do objeto, selecione Texto, Avro ou Pub/Sub Avro.

    Se você selecionar Texto, poderá especificar um Delimitador para dividir objetos em mensagens.

    Para mais informações sobre essas opções, consulte Formato de entrada.

  10. Opcional. É possível especificar uma Hora mínima de criação de objeto para o tópico. Se definido, apenas os objetos criados após o tempo mínimo de criação de objetos serão processados.

    Para mais informações, consulte Tempo mínimo de criação de objetos.

  11. É necessário especificar um padrão Glob. Para ingerir todos os objetos no bucket, use ** como o padrão glob. Se definido, apenas objetos que correspondem ao padrão fornecido serão ingeridos.

    Para mais informações, consulte Corresponder a um padrão de glob.

  12. Mantenha as outras configurações padrão.
  13. Selecione Criar tópico.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    No comando, apenas TOPIC_ID, a flag --cloud-storage-ingestion-bucket e a flag --cloud-storage-ingestion-input-format são obrigatórias. As flags restantes são opcionais e podem ser omitidas.

    Substitua:

    • TOPIC_ID: o nome ou ID do tópico.

    • BUCKET_NAME: especifica o nome de um bucket Por exemplo, prod_bucket. O nome do bucket não pode incluir o ID do projeto. Para criar um bucket, consulte Criar buckets.

    • INPUT_FORMAT: especifica o formato dos objetos que são ingeridos. Os valores podem ser text, avro ou pubsub_avro. Para mais informações sobre essas opções, consulte Formato de entrada.

    • TEXT_DELIMITER: especifica o delimitador para dividir objetos de texto em mensagens do Pub/Sub. Ele precisa ser um único caractere e só pode ser definido quando INPUT_FORMAT for text. O padrão é o caractere de nova linha (\n).

      Ao usar a CLI gcloud para especificar o delimitador, preste atenção ao processamento de caracteres especiais, como o \n de nova linha. Use o formato '\n' para garantir que o delimitador seja interpretado corretamente. Basta usar \n sem aspas ou escapar dos resultados em um delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: especifica o tempo mínimo em que um objeto foi criado para ser transferido. Ele precisa estar no formato UTC YYYY-MM-DDThh:mm:ssZ. Por exemplo, 2024-10-14T08:30:30Z.

      Qualquer data, passada ou futura, de 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z, é válida.

    • MATCH_GLOB: especifica o padrão de glob a ser correspondido para que um objeto seja ingerido. Ao usar a CLI gcloud, um glob de correspondência com caracteres * precisa ter o caractere * formatado como escape no formulário \*\*.txt ou o glob de correspondência inteiro precisa estar entre aspas "**.txt" ou '**.txt'. Para informações sobre a sintaxe com suporte para padrões glob, consulte a documentação do Cloud Storage.

Go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

Antes de testar esta amostra, siga as instruções de configuração do C++ no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Se você tiver problemas, consulte Solução de problemas em um tópico de importação do Cloud Storage.

Editar um tópico de importação do Cloud Storage

É possível editar um tópico de importação do Cloud Storage para atualizar as propriedades dele.

Por exemplo, para reiniciar a transferência, você pode mudar o bucket ou atualizar o tempo mínimo de criação de objetos.

Para editar um tópico de importação do Cloud Storage, siga estas etapas:

Console

  1. No console do Google Cloud, acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tópico de importação do Cloud Storage.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Atualize os campos que você quer mudar.

  5. Clique em Atualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Para evitar a perda das configurações do tópico de importação, inclua todas elas sempre que atualizar o tópico. Se você deixar algo de fora, o Pub/Sub vai redefinir a configuração para o valor padrão original.

    Execute o comando gcloud pubsub topics update com todas as flags mencionadas no exemplo abaixo:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Substitua:

    • TOPIC_ID é o ID ou nome do tópico. Não é possível atualizar este campo.

    • BUCKET_NAME: especifica o nome de um bucket Por exemplo, prod_bucket. O nome do bucket não pode incluir o ID do projeto. Para criar um bucket, consulte Criar buckets.

    • INPUT_FORMAT: especifica o formato dos objetos que são ingeridos. Os valores podem ser text, avro ou pubsub_avro. Consulte Formato de entrada para mais informações sobre essas opções.

    • TEXT_DELIMITER: especifica o delimitador para dividir objetos de texto em mensagens do Pub/Sub. Ele precisa ser um único caractere e só pode ser definido quando INPUT_FORMAT for text. O padrão é o caractere de nova linha (\n).

      Ao usar a CLI gcloud para especificar o delimitador, preste atenção ao processamento de caracteres especiais, como o \n de nova linha. Use o formato '\n' para garantir que o delimitador seja interpretado corretamente. Basta usar \n sem aspas ou escapar dos resultados em um delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: especifica o tempo mínimo em que um objeto foi criado para ser transferido. Ele precisa estar no formato UTC YYYY-MM-DDThh:mm:ssZ. Por exemplo, 2024-10-14T08:30:30Z.

      Qualquer data, passada ou futura, de 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z, é válida.

    • MATCH_GLOB: especifica o padrão de glob a ser correspondido para que um objeto seja ingerido. Ao usar a CLI gcloud, um glob de correspondência com caracteres * precisa ter o caractere * formatado como escape no formulário \*\*.txt ou o glob de correspondência inteiro precisa estar entre aspas "**.txt" ou '**.txt'. Para informações sobre a sintaxe com suporte para padrões glob, consulte a documentação do Cloud Storage.

Cotas e limites para tópicos de importação do Cloud Storage

O throughput do editor para tópicos de importação é limitado pela cota de publicação do tópico. Para mais informações, consulte Cotas e limites do Pub/Sub.

A seguir