Crea un argomento di importazione Confluent Cloud

Un argomento di importazione Confluent Cloud ti consente di importare continuamente i dati da Confluent Cloud come origine esterna e in Pub/Sub. Poi puoi trasmettere i dati in streaming in una delle destinazioni supportate da Pub/Sub.

Questo documento spiega come creare e gestire gli argomenti di importazione Confluent Cloud. Per creare un argomento standard, consulta Creare un argomento standard.

Per saperne di più sugli argomenti di importazione, consulta Informazioni sugli argomenti di importazione.

Prima di iniziare

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per creare e gestire gli argomenti di importazione di Confluent Cloud, chiedi all'amministratore di concederti il ruolo IAM Editor Pub/Sub (roles/pubsub.editor) per il tuo argomento o progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare e gestire gli argomenti di importazione di Confluent Cloud. Per visualizzare le autorizzazioni esatte richieste, espandi la sezione Autorizzazioni richieste:

Autorizzazioni obbligatorie

Per creare e gestire gli argomenti di importazione di Confluent Cloud sono necessarie le seguenti autorizzazioni:

  • Crea un argomento di importazione: pubsub.topics.create
  • Elimina un argomento di importazione: pubsub.topics.delete
  • Ricevi un argomento di importazione: pubsub.topics.get
  • Elenca un argomento di importazione: pubsub.topics.list
  • Pubblica in un argomento di importazione: pubsub.topics.publish
  • Aggiorna un argomento di importazione: pubsub.topics.update
  • Recupera il criterio IAM per un argomento di importazione: pubsub.topics.getIamPolicy
  • Configura il criterio IAM per un argomento di importazione: pubsub.topics.setIamPolicy

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Puoi configurare il controllo dell'accesso a livello di progetto e di singola risorsa.

Configurare l'identità federata per accedere a Confluent Cloud

La federazione delle identità per i workload consente ai Google Cloud servizi di accedere ai workload in esecuzione al di fuori di Google Cloud. Con la federazione delle identità, non è necessario gestire o trasmettere le credenziali per Google Cloud accedere alle risorse in altri cloud. In alternativa, puoi utilizzare le identità dei carichi di lavoro stessi per autenticarti Google Cloud e accedere alle risorse.

Crea un account di servizio in Google Cloud

Questo passaggio è facoltativo. Se hai già un account di servizio, puoi utilizzarlo in questa procedura anziché crearne uno nuovo. Se utilizzi un account di servizio esistente, vai a Registra l'ID univoco dell'account di servizio per il passaggio successivo.

Per gli argomenti di importazione di Confluent Cloud, Pub/Sub utilizza l'account di servizio come identità per accedere alle risorse di Confluent Cloud.

Per ulteriori informazioni sulla creazione di un account di servizio, inclusi i prerequisiti, i ruoli e le autorizzazioni richiesti e le linee guida per la denominazione, consulta Creare account di servizio. Dopo aver creato un account di servizio, potrebbe essere necessario attendere almeno 60 secondi prima di utilizzarlo. Questo comportamento si verifica perché le operazioni di lettura sono infine coerenti; può essere necessario del tempo prima che il nuovo account di servizio sia visibile.

Registra l'ID univoco dell'account di servizio

Per configurare il provider di identità e il pool nella console Confluent Cloud, devi disporre di un ID univoco dell'account di servizio.

  1. Nella console Google Cloud, vai alla pagina dei dettagli dell'account di servizio.

    Vai all'account di servizio

  2. Fai clic sull'account di servizio appena creato o su quello che hai intenzione di utilizzare.

  3. Nella pagina Dettagli account di servizio, registra il numero ID univoco.

    L'ID è necessario come parte del flusso di lavoro per configurare il provider di identità e il pool nella console Confluent Cloud.

Aggiungi il ruolo Creatore token account di servizio all'account di servizio Pub/Sub

Il ruolo Creatore token account di servizio (roles/iam.serviceAccountTokenCreator) consente ai principali di creare credenziali di breve durata per un account di servizio. Questi token o queste credenziali vengono utilizzati per simulare l'identità dell'account di servizio.

Per ulteriori informazioni sull'impersonificazione degli account di servizio, consulta Impersonificazione degli account di servizio.

Puoi anche aggiungere il ruolo Publisher Pub/Sub (roles/pubsub.publisher) durante questa procedura. Per ulteriori informazioni sul ruolo e sul motivo per cui lo aggiungi, consulta Aggiungere il ruolo Publisher Pub/Sub all'account di servizio Pub/Sub.

  1. Nella console Google Cloud, vai alla pagina IAM.

    Vai a IAM

  2. Fai clic sulla casella di controllo Includi concessioni di ruoli fornite da Google.

  3. Cerca l'account di servizio con il formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Per questo account di servizio, fai clic sul pulsante Modifica entità.

  5. Se necessario, fai clic su Aggiungi un altro ruolo.

  6. Cerca e fai clic sul ruolo Creatore token account di servizio (roles/iam.serviceAccountTokenCreator).

  7. Fai clic su Salva.

Creare un provider di identità in Confluent Cloud

Per autenticarsi a Confluent Cloud, l'account di servizio Google Cloud deve avere un pool di identità. Devi prima creare un provider di identità in Confluent Cloud.

Per ulteriori informazioni sulla creazione di un provider di identità in Confluent Cloud, visita la pagina Aggiungere un provider di identità OAuth/OIDC.

  1. Accedi alla console Confluent Cloud.

  2. Nel menu, fai clic su Account e accesso.

  3. Fai clic su Workload Identity.

  4. Fai clic su Aggiungi provider.

  5. Fai clic su OAuth/OIDC e poi su Avanti.

  6. Fai clic su Altro provider OIDC e poi su Avanti.

  7. Fornisci un nome e una descrizione dello scopo del provider di identità.

  8. Fai clic su Mostra configurazione avanzata.

  9. Nel campo URI emittente, inserisci https://accounts.google.com.

  10. Nel campo URI JWKS, inserisci https://www.googleapis.com/oauth2/v3/certs.

  11. Fai clic su Convalida e salva.

Crea un pool di identità e concedi i ruoli appropriati in Confluent Cloud

Devi creare un pool di identità nel tuo profilo identità e concedere i ruoli necessari per consentire all'account di servizio Pub/Sub di autenticarsi e leggere dagli argomenti Kafka di Confluent Cloud.

Assicurati che il cluster sia creato in Confluent Cloud prima di procedere con la creazione di un pool di identità.

Per ulteriori informazioni su come creare un pool di identità, consulta la pagina Utilizzare i pool di identità con il tuo provider di identità OAuth/OIDC.

  1. Accedi alla console Confluent Cloud.

  2. Nel menu, fai clic su Account e accesso.

  3. Fai clic su Workload Identity.

  4. Fai clic sul provider di identità che hai creato in Creare un provider di identità in Confluent Cloud.

  5. Fai clic su Aggiungi pool.

  6. Fornisci un nome e una descrizione per il pool di identità.

  7. Imposta Claim di identità su claims.

  8. In Imposta filtri, fai clic sulla scheda Avanzate. Inserisci il seguente codice:

    claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
    

    Sostituisci <SERVICE_ACCOUNT_UNIQUE_ID> con l'ID univoco del tuo account di servizio indicato in Registrare l'ID univoco dell'account di servizio.

  9. Fai clic su Avanti.

  10. Fai clic su Aggiungi nuova autorizzazione. poi fai clic su Avanti.

  11. Nel cluster pertinente, fai clic su Aggiungi assegnazione del ruolo.

  12. Fai clic sul ruolo Operatore e poi su Aggiungi.

    Questo ruolo concede Pub/Sub. Accesso dell'account di servizio al cluster contenente l'argomento Confluent Kafka che vuoi importare in Pub/Sub.

  13. Sotto il cluster, fai clic su Argomenti. Poi, fai clic su Aggiungi assegnazione del ruolo.

  14. Seleziona il ruolo DeveloperRead.

  15. Fai clic sull'opzione appropriata e specifica l'argomento o il prefisso. Ad esempio, Argomento specifico, Regola prefisso o Tutti gli argomenti.

  16. Fai clic su Aggiungi.

  17. Fai clic su Avanti.

  18. Fai clic su Convalida e salva.

Aggiungi il ruolo Publisher Pub/Sub all'entità Pub/Sub

Per attivare la pubblicazione, devi assegnare un ruolo di publisher all'account di servizio Pub/Sub in modo che Pub/Sub possa pubblicare nell'argomento di importazione di Confluent Cloud.

Attivare la pubblicazione da tutti gli argomenti

Utilizza questo metodo se non hai creato argomenti di importazione Confluent Cloud.

  1. Nella console Google Cloud, vai alla pagina IAM.

    Vai a IAM

  2. Fai clic sulla casella di controllo Includi concessioni di ruoli fornite da Google.

  3. Cerca l'account di servizio con il formatoservice-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Per questo account di servizio, fai clic sul pulsante Modifica entità.

  5. Se necessario, fai clic su Aggiungi un altro ruolo.

  6. Cerca e fai clic sul ruolo Publisher Pub/Sub (roles/pubsub.publisher).

  7. Fai clic su Salva.

Attivare la pubblicazione da un singolo argomento

Utilizza questo metodo solo se l'argomento di importazione Confluent Cloud esiste già.

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    Sostituisci quanto segue:

    • TOPIC_ID: l'ID dell'argomento di importazione di Confluent Cloud.

    • PROJECT_NUMBER: il numero del progetto. Per visualizzare il numero del progetto, consulta Identificazione dei progetti.

Aggiungi il ruolo Utente account di servizio all'account di servizio

Il ruolo Utente account di servizio (roles/iam.serviceAccountUser) include l'autorizzazione iam.serviceAccounts.actAs che consente a un'entità di collegare un account di servizio alle impostazioni di importazione dell'argomento Confluent Cloud e di utilizzarlo per l'identità federata.

  1. Nella console Google Cloud, vai alla pagina IAM.

    Vai a IAM

  2. Per l'entità che emette le chiamate di creazione o aggiornamento degli argomenti, fai clic sul pulsante Modifica entità.

  3. Se necessario, fai clic su Aggiungi un altro ruolo.

  4. Cerca e fai clic sul ruolo Utente account di servizio (roles/iam.serviceAccountUser).

  5. Fai clic su Salva.

Utilizzare gli argomenti di importazione di Confluent Cloud

Puoi creare un nuovo argomento di importazione o modificarne uno esistente.

Considerazioni

  • La creazione separata dell'argomento e della sottoscrizione, anche se eseguita in rapida successione, può comportare la perdita di dati. Esiste un breve periodo di tempo in cui l'argomento esiste senza un abbonamento. Se durante questo periodo vengono inviati dati all'argomento, andranno persi. Se crei prima l'argomento, poi la sottoscrizione e infine converti l'argomento in un argomento di importazione, garantisci che nessun messaggio venga perso durante il processo di importazione.

  • Se devi ricreare l'argomento Kafka di un argomento di importazione esistente con lo stesso nome, non puoi semplicemente eliminare l'argomento Kafka e ricrearlo. Questa azione può invalidare la gestione dell'offset di Pub/Sub, il che può comportare la perdita di dati. Per attenuare il problema, segui questi passaggi:

    • Elimina l'argomento di importazione Pub/Sub.
    • Elimina l'argomento Kafka.
    • Crea l'argomento Kafka.
    • Crea l'argomento di importazione Pub/Sub.
  • I dati di un argomento Kafka di Confluent Cloud vengono sempre letti dall'offset minimo.

Crea un argomento di importazione Confluent Cloud

Per saperne di più sulle proprietà associate a un argomento, consulta Proprietà di un argomento.

Assicurati di aver completato le seguenti procedure:

Per creare un argomento di importazione Confluent Cloud:

Console

  1. Nella console Google Cloud, vai alla pagina Topic.

    Vai ad Argomenti

  2. Fai clic su Crea argomento.

  3. Nel campo ID argomento, inserisci un ID per l'argomento di importazione.

    Per ulteriori informazioni sugli argomenti relativi alla denominazione, consulta le linee guida per i nomi.

  4. Seleziona Aggiungi una sottoscrizione predefinita.

  5. Seleziona Attiva importazione.

  6. Per l'origine di importazione, seleziona Confluent Cloud.

  7. Inserisci i seguenti dettagli:

    1. Server bootstrap: il server bootstrap del tuo cluster contenente l'argomento Kafka che stai importando in Pub/Sub. Il formato è il seguente: hostname:port.

    2. ID cluster: l'ID del cluster contenente l'argomento Kafka che stai importando in Pub/Sub.

    3. Argomento: il nome dell'argomento Kafka che stai importando in Pub/Sub.

    4. ID pool di identità: l'ID del pool di identità utilizzato per autenticarti con Confluent Cloud.

    5. Account di servizio:l'account di servizio che hai creato in Creare un account di servizio in Google Cloud.

  8. Fai clic su Crea argomento.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID \
        --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
        --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
        --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
        --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
        --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Sostituisci quanto segue:

    • TOPIC_ID: il nome o l'ID del tuo argomento Pub/Sub.
    • CONFLUENT_BOOTSTRAP_SERVER: il server bootstrap del tuo cluster contenente l'argomento Kafka che stai importando in Pub/Sub. Il formato è il seguente: hostname:port.
    • CONFLUENT_CLUSTER_ID: l'ID del cluster contenente l'argomento Kafka che stai importando in Pub/Sub.
    • CONFLUENT_TOPIC: il nome dell'argomento Kafka che stai importando in Pub/Sub.
    • CONFLUENT_IDENTITY_POOL_ID: l'ID del pool di identità utilizzato per l'autenticazione con Confluent Cloud.
    • PUBSUB_SERVICE_ACCOUNT: l'account di servizio che hai creato in Creare un account di servizio in Google Cloud.

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // Confluent Cloud ingestion settings.
	// bootstrapServer := "bootstrap-server"
	// clusterID := "cluster-id"
	// confluentTopic := "confluent-topic"
	// poolID := "identity-pool-id"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceConfluentCloud{
				BootstrapServer:   bootstrapServer,
				ClusterID:         clusterID,
				Topic:             confluentTopic,
				IdentityPoolID:    poolID,
				GCPServiceAccount: gcpSA,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", t)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Java.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Confluent Cloud ingestion settings.
    String bootstrapServer = "bootstrap-server";
    String clusterId = "cluster-id";
    String confluentTopic = "confluent-topic";
    String identityPoolId = "identity-pool-id";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithConfluentCloudIngestionExample(
        projectId,
        topicId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount);
  }

  public static void createTopicWithConfluentCloudIngestionExample(
      String projectId,
      String topicId,
      String bootstrapServer,
      String clusterId,
      String confluentTopic,
      String identityPoolId,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.ConfluentCloud confluentCloud =
          IngestionDataSourceSettings.ConfluentCloud.newBuilder()
              .setBootstrapServer(bootstrapServer)
              .setClusterId(clusterId)
              .setTopic(confluentTopic)
              .setIdentityPoolId(identityPoolId)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId,
  bootstrapServer,
  clusterId,
  confluentTopic,
  identityPoolId,
  gcpServiceAccount
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Python.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bootstrap_server = "your-bootstrap-server"
# cluster_id = "your-cluster-id"
# confluent_topic = "your-confluent-topic"
# identity_pool_id = "your-identity-pool-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
            bootstrap_server=bootstrap_server,
            cluster_id=cluster_id,
            topic=confluent_topic,
            identity_pool_id=identity_pool_id,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& bootstrap_server,
   std::string const& cluster_id, std::string const& confluent_topic,
   std::string const& identity_pool_id,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                              ->mutable_confluent_cloud();
  confluent_cloud->set_bootstrap_server(bootstrap_server);
  confluent_cloud->set_cluster_id(cluster_id);
  confluent_cloud->set_topic(confluent_topic);
  confluent_cloud->set_identity_pool_id(identity_pool_id);
  confluent_cloud->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId: string,
  bootstrapServer: string,
  clusterId: string,
  confluentTopic: string,
  identityPoolId: string,
  gcpServiceAccount: string
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Se riscontri problemi, consulta l'argomento sulla risoluzione dei problemi di un argomento di importazione Confluent Cloud.

Modificare un argomento di importazione di Confluent Cloud Hubs

Per modificare le impostazioni dell'origine dati di importazione di un argomento di importazione Confluent Cloud:

Console

  1. Nella console Google Cloud, vai alla pagina Topic.

    Vai ad Argomenti

  2. Fai clic sull'argomento di importazione di Confluent Cloud.

  3. Nella pagina dei dettagli dell'argomento, fai clic su Modifica.

  4. Aggiorna i campi che vuoi modificare.

  5. Fai clic su Aggiorna.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    Per evitare di perdere le impostazioni per l'argomento di importazione, assicurati di includerle tutte ogni volta che aggiorni l'argomento. Se ne ometti una, Pub/Sub reimposta l'impostazione sul valore predefinito originale.

  2. Esegui il comando gcloud pubsub topics update con tutti i flag menzionati nel seguente esempio:

    gcloud pubsub topics update TOPIC_ID \
       --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
       --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
       --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
       --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
       --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Sostituisci quanto segue:

    • TOPIC_ID: il nome o l'ID del tuo argomento Pub/Sub.
    • CONFLUENT_BOOTSTRAP_SERVER: il server bootstrap del tuo cluster contenente l'argomento Kafka che stai importando in Pub/Sub. Il formato è il seguente: hostname:port.
    • CONFLUENT_CLUSTER_ID: l'ID del cluster contenente l'argomento Kafka che stai importando in Pub/Sub
    • CONFLUENT_TOPIC: il nome dell'argomento Kafka che stai importando in Pub/Sub.
    • CONFLUENT_IDENTITY_POOL_ID: l'ID del pool di identità utilizzato per l'autenticazione con Confluent Cloud.
    • CONFLUENT_IDENTITY_POOL_ID: l'account di servizio che hai creato in Creare un account di servizio in Google Cloud.

Quote e limiti

Il throughput del publisher per gli argomenti di importazione è vincolato dalla quota di pubblicazione dell'argomento. Per ulteriori informazioni, consulta Quote e limiti di Pub/Sub.

Passaggi successivi

Apache Kafka® è un marchio registrato di The Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.