Criar um tópico de importação do Confluent Cloud

Um tópico de importação do Confluent Cloud permite ingerir dados continuamente do Confluent Cloud como uma fonte externa no Pub/Sub. Em seguida, você pode transmitir os dados para qualquer um dos destinos compatíveis com o Pub/Sub.

Este documento mostra como criar e gerenciar tópicos de importação do Confluent Cloud. Para criar um tópico padrão, consulte Criar um tópico padrão.

Para mais informações sobre os tópicos de importação, consulte Sobre os tópicos de importação.

Antes de começar

Papéis e permissões necessárias

Para receber as permissões necessárias para criar e gerenciar tópicos de importação do Confluent Cloud, peça ao administrador para conceder a você o papel do IAM de Editor do Pub/Sub (roles/pubsub.editor) no seu tópico ou projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para criar e gerenciar tópicos de importação do Confluent Cloud. Para conferir as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para criar e gerenciar tópicos de importação da Confluent Cloud:

  • Crie um tópico de importação: pubsub.topics.create
  • Exclua um tema de importação: pubsub.topics.delete
  • Receba um tema de importação: pubsub.topics.get
  • Listar um tema de importação: pubsub.topics.list
  • Publicar em um tópico de importação: pubsub.topics.publish
  • Atualizar um tema de importação: pubsub.topics.update
  • Acesse a política do IAM para um tópico de importação: pubsub.topics.getIamPolicy
  • Configure a política do IAM para um tópico de importação: pubsub.topics.setIamPolicy

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

É possível configurar o controle de acesso no nível do projeto e do recurso individual.

Configurar a identidade federada para acessar o Confluent Cloud

A federação de identidade da carga de trabalho permite que os serviços Google Cloud acessam cargas de trabalho executadas fora do Google Cloud. Com a federação de identidade, não é necessário manter ou transmitir credenciais para Google Cloud acessar seus recursos em outras nuvens. Em vez disso, use as identidades das cargas de trabalho para autenticar em Google Cloud e acessar recursos.

Crie uma conta de serviço em Google Cloud

Essa é uma etapa opcional. Se você já tiver uma conta de serviço, poderá usá-la neste procedimento em vez de criar uma nova. Se você estiver usando uma conta de serviço atual, acesse Gravar o ID exclusivo da conta de serviço para a próxima etapa.

Para tópicos de importação do Confluent Cloud, o Pub/Sub usa a conta de serviço como a identidade para acessar recursos do Confluent Cloud.

Para mais informações sobre como criar uma conta de serviço, incluindo pré-requisitos, papéis e permissões obrigatórios e diretrizes de nomenclatura, consulte Criar contas de serviço. Depois de criar uma conta de serviço, talvez seja necessário aguardar 60 segundos ou mais para usá-la. Esse comportamento ocorre porque as operações de leitura têm consistência eventual e pode levar algum tempo para a nova conta de serviço ficar visível.

Gravar o ID exclusivo da conta de serviço

Você precisa de um ID exclusivo da conta de serviço para configurar o provedor de identidade e o pool no console do Confluent Cloud.

  1. No console do Google Cloud, acesse a página de detalhes da Conta de serviço.

    Acessar a conta de serviço

  2. Clique na conta de serviço que você acabou de criar ou que planeja usar.

  3. Na página Detalhes da conta de serviço, registre o número do ID exclusivo.

    Você precisa do ID como parte do fluxo de trabalho para configurar o provedor de identidade e o pool no console do Confluent Cloud.

Adicionar o papel de criador do token da conta de serviço à conta de serviço do Pub/Sub

O papel Criador de token da conta de serviço (roles/iam.serviceAccountTokenCreator) permite que os participantes criem credenciais de curta duração para uma conta de serviço. Esses tokens ou credenciais são usados para falsificar a conta de serviço.

Para mais informações sobre representação da conta de serviço, consulte Representação da conta de serviço.

Também é possível adicionar o papel de editor do Pub/Sub (roles/pubsub.publisher) durante esse procedimento. Para mais informações sobre o papel e por que ele está sendo adicionado, consulte Adicionar o papel de editor do Pub/Sub à conta de serviço do Pub/Sub.

  1. No console do Google Cloud, abra a página IAM.

    Acessar IAM

  2. Clique na caixa de seleção Incluir concessões de papel fornecidas pelo Google.

  3. Procure a conta de serviço com o formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Para essa conta de serviço, clique no botão Editar principal.

  5. Se necessário, clique em Adicionar outro papel.

  6. Pesquise e clique em Papel de criador do token da conta de serviço (roles/iam.serviceAccountTokenCreator).

  7. Clique em Salvar.

Criar um provedor de identidade na Confluent Cloud

Para autenticar no Confluent Cloud, a conta de serviço do Google Cloud precisa de um pool de identidade. Primeiro, crie um provedor de identidade no Confluent Cloud.

Para mais informações sobre como criar um provedor de identidade no Confluent Cloud, acesse a página Adicionar um provedor de identidade OAuth/OIDC.

  1. Faça login no console do Confluent Cloud.

  2. No menu, clique em Contas e acesso.

  3. Clique em Identidades de carga de trabalho.

  4. Clique em Adicionar provedor.

  5. Clique em OAuth/OIDC e em Próxima.

  6. Clique em Outro provedor OIDC e em Próxima.

  7. Forneça um nome e uma descrição do objetivo do provedor de identidade.

  8. Clique em Mostrar configuração avançada.

  9. No campo URI do emissor, digite https://accounts.google.com.

  10. No campo URI do JWKs, digite https://www.googleapis.com/oauth2/v3/certs.

  11. Clique em Validar e salvar.

Crie um pool de identidades e conceda as funções adequadas na Confluent Cloud

Você precisa criar um pool de identidade no seu perfil de identidade e conceder os papéis necessários para permitir que a conta de serviço do Pub/Sub autentique e leia os tópicos do Confluent Cloud Kafka.

Verifique se o cluster foi criado na Confluent Cloud antes de prosseguir com a criação de um pool de identidade.

Para mais informações sobre como criar um pool de identidade, acesse a página Usar pools de identidade com seu provedor de identidade OAuth/OIDC.

  1. Faça login no console do Confluent Cloud.

  2. No menu, clique em Contas e acesso.

  3. Clique em Identidades de carga de trabalho.

  4. Clique no provedor de identidade que você criou em Criar um provedor de identidade na Confluent Cloud.

  5. Clique em Adicionar pool.

  6. Informe um nome e uma descrição para o pool de identidade.

  7. Defina Declaração de identidade como claims.

  8. Em Definir filtros, clique na guia Avançado. Insira o seguinte código:

    claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
    

    Substitua <SERVICE_ACCOUNT_UNIQUE_ID> pelo ID exclusivo da sua conta de serviço encontrado em Gravar o ID exclusivo da conta de serviço.

  9. Clique em Próxima.

  10. Clique em Adicionar nova permissão. Em seguida, clique em Próxima.

  11. No cluster relevante, clique em Adicionar atribuição de função.

  12. Clique na função Operador e em Adicionar.

    Esse papel concede o Pub/Sub. Acesso da conta de serviço ao cluster que contém o tópico do Confluent Kafka que você quer transferir para o Pub/Sub.

  13. Em "Cluster", clique em Tópicos. Em seguida, clique em Adicionar atribuição de função.

  14. Selecione a função DeveloperRead.

  15. Clique na opção adequada e especifique o tema ou prefixo. Por exemplo, Assunto específico, Regra de prefixo ou Todos os assuntos.

  16. Clique em Adicionar.

  17. Clique em Próxima.

  18. Clique em Validar e salvar.

Adicionar o papel de editor do Pub/Sub ao principal do Pub/Sub

Para ativar a publicação, atribua um papel de editor à conta de serviço do Pub/Sub para que ele possa publicar no tópico de importação do Confluent Cloud.

Ativar a publicação em todos os tópicos

Use esse método se você não tiver criado tópicos de importação do Confluent Cloud.

  1. No console do Google Cloud, abra a página IAM.

    Acessar IAM

  2. Clique na caixa de seleção Incluir concessões de papel fornecidas pelo Google.

  3. Procure a conta de serviço com o formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Para essa conta de serviço, clique no botão Editar principal.

  5. Se necessário, clique em Adicionar outro papel.

  6. Pesquise e clique na função de editor do Pub/Sub (roles/pubsub.publisher).

  7. Clique em Salvar.

Ativar a publicação em um único tópico

Use esse método somente se o tópico de importação do Confluent Cloud já existir.

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    Substitua:

    • TOPIC_ID: o ID do tópico de importação do Confluent Cloud.

    • PROJECT_NUMBER: o número do projeto. Para conferir o número do projeto, consulte Como identificar projetos.

Adicionar a função de usuário da conta de serviço à conta de serviço

O papel de usuário da conta de serviço (roles/iam.serviceAccountUser) inclui a permissão iam.serviceAccounts.actAs, que permite que um principal anexe uma conta de serviço às configurações de ingestão do tópico de importação do Confluent Cloud e use essa conta de serviço para identidade federada.

  1. No console do Google Cloud, abra a página IAM.

    Acessar IAM

  2. Para o principal que está emitindo as chamadas de criação ou atualização de tópicos, clique no botão Editar principal.

  3. Se necessário, clique em Adicionar outro papel.

  4. Pesquise e clique em Papel de usuário da conta de serviço (roles/iam.serviceAccountUser).

  5. Clique em Salvar.

Usar tópicos de importação do Confluent Cloud

Você pode criar um novo tópico de importação ou editar um já existente.

Considerações

  • Criar o tópico e a assinatura separadamente, mesmo que em rápida sequência, pode levar à perda de dados. Há um curto período em que o tópico existe sem uma assinatura. Se algum dado for enviado para o tópico durante esse período, ele será perdido. Ao criar o tópico primeiro, criar a assinatura e, em seguida, converter o tópico em um tópico de importação, você garante que nenhuma mensagem seja perdida durante o processo de importação.

  • Se você precisar recriar o tópico do Kafka de um tópico de importação com o mesmo nome, não basta excluir e recriar o tópico do Kafka. Essa ação pode invalidar o gerenciamento de deslocamento do Pub/Sub, o que pode levar à perda de dados. Para evitar isso, siga estas etapas:

    • Exclua o tópico de importação do Pub/Sub.
    • Exclua o tópico do Kafka.
    • Crie o tópico Kafka.
    • Crie o tópico de importação do Pub/Sub.
  • Os dados de um tópico do Confluent Cloud Kafka são sempre lidos a partir do offset mais antigo.

Criar um tópico de importação do Confluent Cloud

Para saber mais sobre as propriedades associadas a um tópico, consulte Propriedades de um tópico.

Verifique se você concluiu os seguintes procedimentos:

Para criar um tópico de importação da Confluent Cloud, siga estas etapas:

Console

  1. No console do Google Cloud, acesse a página Tópicos.

    Acesse Tópicos

  2. Selecione Criar tópico.

  3. No campo ID do tópico, insira um ID para o tópico de importação.

    Para mais informações sobre a nomenclatura de tópicos, consulte as diretrizes de nomenclatura.

  4. Selecione Adicionar uma assinatura padrão.

  5. Selecione Ativar ingestão.

  6. Em "Origem de transferência", selecione Confluent Cloud.

  7. Digite os seguintes detalhes:

    1. Servidor de inicialização: o servidor de inicialização do cluster que contém o tópico do Kafka que você está ingerindo no Pub/Sub. O formato é o seguinte: hostname:port.

    2. ID do cluster: o ID do cluster que contém o tópico do Kafka que você está ingerindo no Pub/Sub.

    3. Tópico: o nome do tópico do Kafka que você está ingerindo no Pub/Sub.

    4. ID do pool de identidades: o ID do pool de identidades usado para autenticar com o Confluent Cloud.

    5. Conta de serviço:a conta de serviço que você criou em Criar uma conta de serviço no Google Cloud.

  8. Selecione Criar tópico.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID \
        --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
        --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
        --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
        --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
        --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Substitua:

    • TOPIC_ID: o nome ou ID do tópico do Pub/Sub.
    • CONFLUENT_BOOTSTRAP_SERVER: o servidor de inicialização do cluster que contém o tópico do Kafka que você está ingerindo no Pub/Sub. O formato é o seguinte: hostname:port.
    • CONFLUENT_CLUSTER_ID: o ID do cluster que contém o tópico do Kafka que você está ingerindo no Pub/Sub.
    • CONFLUENT_TOPIC: o nome do tópico do Kafka que você está ingerindo no Pub/Sub.
    • CONFLUENT_IDENTITY_POOL_ID: o ID do pool de identidade usado para autenticação com o Confluent Cloud.
    • PUBSUB_SERVICE_ACCOUNT: a conta de serviço criada em Criar uma conta de serviço no Google Cloud.

Go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // Confluent Cloud ingestion settings.
	// bootstrapServer := "bootstrap-server"
	// clusterID := "cluster-id"
	// confluentTopic := "confluent-topic"
	// poolID := "identity-pool-id"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceConfluentCloud{
				BootstrapServer:   bootstrapServer,
				ClusterID:         clusterID,
				Topic:             confluentTopic,
				IdentityPoolID:    poolID,
				GCPServiceAccount: gcpSA,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", t)
	return nil
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Confluent Cloud ingestion settings.
    String bootstrapServer = "bootstrap-server";
    String clusterId = "cluster-id";
    String confluentTopic = "confluent-topic";
    String identityPoolId = "identity-pool-id";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithConfluentCloudIngestionExample(
        projectId,
        topicId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount);
  }

  public static void createTopicWithConfluentCloudIngestionExample(
      String projectId,
      String topicId,
      String bootstrapServer,
      String clusterId,
      String confluentTopic,
      String identityPoolId,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.ConfluentCloud confluentCloud =
          IngestionDataSourceSettings.ConfluentCloud.newBuilder()
              .setBootstrapServer(bootstrapServer)
              .setClusterId(clusterId)
              .setTopic(confluentTopic)
              .setIdentityPoolId(identityPoolId)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId,
  bootstrapServer,
  clusterId,
  confluentTopic,
  identityPoolId,
  gcpServiceAccount
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bootstrap_server = "your-bootstrap-server"
# cluster_id = "your-cluster-id"
# confluent_topic = "your-confluent-topic"
# identity_pool_id = "your-identity-pool-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
            bootstrap_server=bootstrap_server,
            cluster_id=cluster_id,
            topic=confluent_topic,
            identity_pool_id=identity_pool_id,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

C++

Antes de testar esta amostra, siga as instruções de configuração do C++ no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& bootstrap_server,
   std::string const& cluster_id, std::string const& confluent_topic,
   std::string const& identity_pool_id,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                              ->mutable_confluent_cloud();
  confluent_cloud->set_bootstrap_server(bootstrap_server);
  confluent_cloud->set_cluster_id(cluster_id);
  confluent_cloud->set_topic(confluent_topic);
  confluent_cloud->set_identity_pool_id(identity_pool_id);
  confluent_cloud->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Antes de testar este exemplo, siga as instruções de configuração do Node.js no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId: string,
  bootstrapServer: string,
  clusterId: string,
  confluentTopic: string,
  identityPoolId: string,
  gcpServiceAccount: string
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Se você tiver problemas, consulte Como resolver problemas em um tópico de importação do Confluent Cloud.

Editar um tópico de importação do Confluent Cloud Hubs

Para editar as configurações da fonte de dados de ingestão de um tópico de importação do Confluent Cloud, siga estas etapas:

Console

  1. No console do Google Cloud, acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tópico de importação do Confluent Cloud.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Atualize os campos que você quer mudar.

  5. Clique em Atualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    Para evitar a perda das configurações do tópico de importação, inclua todas elas sempre que atualizar o tópico. Se você deixar algo de fora, o Pub/Sub vai redefinir a configuração para o valor padrão original.

  2. Execute o comando gcloud pubsub topics update com todas as flags mencionadas no exemplo a seguir:

    gcloud pubsub topics update TOPIC_ID \
       --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
       --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
       --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
       --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
       --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Substitua:

    • TOPIC_ID: o nome ou ID do tópico do Pub/Sub.
    • CONFLUENT_BOOTSTRAP_SERVER: o servidor de inicialização do cluster que contém o tópico do Kafka que você está ingerindo no Pub/Sub. O formato é o seguinte: hostname:port.
    • CONFLUENT_CLUSTER_ID: o ID do cluster que contém o tópico do Kafka que você está ingerindo no Pub/Sub.
    • CONFLUENT_TOPIC: o nome do tópico do Kafka que você está ingerindo no Pub/Sub.
    • CONFLUENT_IDENTITY_POOL_ID: o ID do pool de identidade usado para autenticação com o Confluent Cloud.
    • CONFLUENT_IDENTITY_POOL_ID: a conta de serviço criada em Criar uma conta de serviço no Google Cloud.

Cotas e limites

O throughput do editor para tópicos de importação é limitado pela cota de publicação do tópico. Para mais informações, consulte Cotas e limites do Pub/Sub.

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.