Mudar o tipo de tópico

É possível converter um tema de importação em um padrão ou vice-versa.

Converter um tópico de importação em um padrão

Para converter um tópico de importação em um padrão, limpe as configurações de transferência. Siga as etapas abaixo:

Console

  1. No console do Google Cloud, acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tópico de importação.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Desmarque a opção Ativar transferência.

  5. Clique em Atualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics update:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Substitua TOPIC_ID pelo ID do tópico.

Converter um tópico padrão em um tópico de importação do Amazon Kinesis Data Streams

Para converter um tópico padrão em um tópico de importação do Amazon Kinesis Data Streams, primeiro verifique se você atende a todos os pré-requisitos.

Console

  1. No console do Google Cloud, acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tópico que você quer converter em um tema importado.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Selecione a opção Ativar transferência.

  5. Em "Origem de transferência", selecione Amazon Kinesis Data Streams.

  6. Digite os seguintes detalhes:

    • ARN do Kinesis Stream: o ARN do Kinesis Data Stream que você planeja ingerir no Pub/Sub. O formato ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN do consumidor do Kinesis: o ARN do recurso do consumidor registrado no fluxo de dados do AWS Kinesis. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN do papel da AWS: o ARN do papel da AWS. O formato ARN do papel é o seguinte: arn:aws:iam:${Account}:role/${RoleName}.

    • Conta de serviço: a conta de serviço que você criou em Criar uma conta de serviço no Google Cloud.

  7. Clique em Atualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics update com todas as flags mencionadas no exemplo abaixo:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Substitua:

    • TOPIC_ID é o ID ou nome do tópico. Não é possível atualizar este campo.

    • KINESIS_STREAM_ARN é o ARN dos fluxos de dados do Kinesis que você planeja ingerir no Pub/Sub. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN é o ARN do recurso de consumidor registrado no AWS Kinesis Data Streams. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN é o ARN do papel da AWS. O formato ARN da função é o seguinte: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT é a conta de serviço que você criou em Criar uma conta de serviço no Google Cloud.

Go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Antes de testar esta amostra, siga as instruções de configuração do C++ no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Para mais informações sobre ARNs, consulte Nomes de recursos da Amazon (ARNs) e Identificadores do IAM.

Converter um tópico padrão em um tópico de importação do Cloud Storage

Para converter um tópico padrão em um tópico de importação do Cloud Storage, verifique se você atende a todos os pré-requisitos.

Console

  1. No console do Google Cloud, acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tema que você quer converter em um tema de importação do Cloud Storage.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Selecione a opção Ativar transferência.

  5. Em "Origem de transferência", selecione Google Cloud Storage.

  6. No bucket do Cloud Storage, clique em Procurar.

    A página Selecionar bucket é aberta. Selecione uma das seguintes opções:

    • Selecione um bucket de qualquer projeto apropriado.

    • Clique no ícone de criação e siga as instruções na tela para criar um novo bucket. Depois de criar o bucket, selecione-o para o tópico de importação do Cloud Storage.

  7. Quando você especifica o bucket, o Pub/Sub verifica as permissões adequadas no bucket para a conta de serviço do Pub/Sub. Se houver problemas de permissão, uma mensagem de erro relacionada às permissões vai aparecer.

    Se você tiver problemas de permissão, clique em Definir permissões. Para mais informações, consulte Concessão de permissões do Cloud Storage à conta de serviço do Pub/Sub.

  8. Em Formato do objeto, selecione Texto, Avro ou Pub/Sub Avro.

    Se você selecionar Texto, poderá especificar um Delimitador para dividir objetos em mensagens.

    Para mais informações sobre essas opções, consulte Formato de entrada.

  9. Opcional. É possível especificar uma Hora mínima de criação de objeto para o tópico. Se definido, apenas os objetos criados após o tempo mínimo de criação de objetos serão processados.

    Para mais informações, consulte Tempo mínimo de criação de objetos.

  10. É necessário especificar um padrão Glob. Para ingerir todos os objetos no bucket, use ** como o padrão glob. Somente objetos que correspondem ao padrão fornecido são ingeridos.

    Para mais informações, consulte Corresponder a um padrão de glob.

  11. Mantenha as outras configurações padrão.
  12. Clique em Atualizar tópico.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Para evitar a perda das configurações do tópico de importação, inclua todas elas sempre que atualizar o tópico. Se você deixar algo de fora, o Pub/Sub vai redefinir a configuração para o valor padrão original.

    Execute o comando gcloud pubsub topics update com todas as flags mencionadas no exemplo abaixo:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Substitua:

    • TOPIC_ID é o ID ou nome do tópico. Não é possível atualizar este campo.

    • BUCKET_NAME: especifica o nome de um bucket Por exemplo, prod_bucket. O nome do bucket não pode incluir o ID do projeto. Para criar um bucket, consulte Criar buckets.

    • INPUT_FORMAT: especifica o formato dos objetos que são ingeridos. Os valores podem ser text, avro ou pubsub_avro. Para mais informações sobre essas opções, consulte Formato de entrada.

    • TEXT_DELIMITER: especifica o delimitador para dividir objetos de texto em mensagens do Pub/Sub. Ele precisa ser um único caractere e só pode ser definido quando INPUT_FORMAT for text. O padrão é o caractere de nova linha (\n).

      Ao usar a CLI gcloud para especificar o delimitador, preste atenção ao processamento de caracteres especiais, como o \n de nova linha. Use o formato '\n' para garantir que o delimitador seja interpretado corretamente. Basta usar \n sem aspas ou escapar dos resultados em um delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: especifica o tempo mínimo em que um objeto foi criado para ser transferido. Ele precisa estar no formato UTC YYYY-MM-DDThh:mm:ssZ. Por exemplo, 2024-10-14T08:30:30Z.

      Qualquer data, passada ou futura, de 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z, é válida.

    • MATCH_GLOB: especifica o padrão de glob a ser correspondido para que um objeto seja ingerido. Ao usar a CLI gcloud, um glob de correspondência com caracteres * precisa ter o caractere * formatado como escape no formulário \*\*.txt ou o glob de correspondência inteiro precisa estar entre aspas "**.txt" ou '**.txt'. Para informações sobre a sintaxe com suporte para padrões glob, consulte a documentação do Cloud Storage.