Crear un tema con la ingestión de Confluent Cloud

Crear un tema con la ingestión de Confluent Cloud

Investigar más

Para obtener documentación detallada que incluya este código de muestra, consulta lo siguiente:

Código de ejemplo

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API C++ Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& bootstrap_server,
   std::string const& cluster_id, std::string const& confluent_topic,
   std::string const& identity_pool_id,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                              ->mutable_confluent_cloud();
  confluent_cloud->set_bootstrap_server(bootstrap_server);
  confluent_cloud->set_cluster_id(cluster_id);
  confluent_cloud->set_topic(confluent_topic);
  confluent_cloud->set_identity_pool_id(identity_pool_id);
  confluent_cloud->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API C# Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


using Google.Cloud.PubSub.V1;
using System;

public class CreateTopicWithConfluentCloudIngestionSample
{
    public Topic CreateTopicWithConfluentCloudIngestion(string projectId, string topicId, string bootstrapServer, string clusterId, string confluentTopic, string identityPoolId, string gcpServiceAccount)
    {
        // Define settings for Confluent Cloud ingestion
        IngestionDataSourceSettings ingestionDataSourceSettings = new IngestionDataSourceSettings
        {
            ConfluentCloud = new IngestionDataSourceSettings.Types.ConfluentCloud
            {
                BootstrapServer = bootstrapServer,
                ClusterId = clusterId,
                Topic = confluentTopic,
                IdentityPoolId = identityPoolId,
                GcpServiceAccount = gcpServiceAccount
            }
        };

        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        Topic topic = new Topic()
        {
            Name = TopicName.FormatProjectTopic(projectId, topicId),
            IngestionDataSourceSettings = ingestionDataSourceSettings
        };
        Topic createdTopic = publisher.CreateTopic(topic);
        Console.WriteLine($"Topic {topic.Name} created.");

        return createdTopic;
    }
}

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // Confluent Cloud ingestion settings.
	// bootstrapServer := "bootstrap-server"
	// clusterID := "cluster-id"
	// confluentTopic := "confluent-topic"
	// poolID := "identity-pool-id"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceConfluentCloud{
				BootstrapServer:   bootstrapServer,
				ClusterID:         clusterID,
				Topic:             confluentTopic,
				IdentityPoolID:    poolID,
				GCPServiceAccount: gcpSA,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", t)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Java Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Confluent Cloud ingestion settings.
    String bootstrapServer = "bootstrap-server";
    String clusterId = "cluster-id";
    String confluentTopic = "confluent-topic";
    String identityPoolId = "identity-pool-id";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithConfluentCloudIngestionExample(
        projectId,
        topicId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount);
  }

  public static void createTopicWithConfluentCloudIngestionExample(
      String projectId,
      String topicId,
      String bootstrapServer,
      String clusterId,
      String confluentTopic,
      String identityPoolId,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.ConfluentCloud confluentCloud =
          IngestionDataSourceSettings.ConfluentCloud.newBuilder()
              .setBootstrapServer(bootstrapServer)
              .setClusterId(clusterId)
              .setTopic(confluentTopic)
              .setIdentityPoolId(identityPoolId)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Node.js Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId,
  bootstrapServer,
  clusterId,
  confluentTopic,
  identityPoolId,
  gcpServiceAccount,
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Node.js de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId: string,
  bootstrapServer: string,
  clusterId: string,
  confluentTopic: string,
  identityPoolId: string,
  gcpServiceAccount: string,
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración de PHP que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API PHP Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub topic with Confluent Cloud ingestion.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $bootstrapServer  The address of the bootstrap server. The format is url:port.
 * @param string $clusterId  The id of the cluster.
 * @param string $confluentTopic  The name of the topic in the Confluent Cloud cluster that Pub/Sub will import from.
 * @param string $identityPoolId  The id of the identity pool to be used for Federated Identity authentication with Confluent Cloud.
 * @param string $gcpServiceAccount  The GCP service account to be used for Federated Identity authentication with identity_pool_id.
 */
function create_topic_with_confluent_cloud_ingestion(
    string $projectId,
    string $topicName,
    string $bootstrapServer,
    string $clusterId,
    string $confluentTopic,
    string $identityPoolId,
    string $gcpServiceAccount
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->createTopic($topicName, [
        'ingestionDataSourceSettings' => [
            'confluent_cloud' => [
                'bootstrap_server' => $bootstrapServer,
                'cluster_id' => $clusterId,
                'topic' => $confluentTopic,
                'identity_pool_id' => $identityPoolId,
                'gcp_service_account' => $gcpServiceAccount
            ]
        ]
    ]);

    printf('Topic created: %s' . PHP_EOL, $topic->name());
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Python Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bootstrap_server = "your-bootstrap-server"
# cluster_id = "your-cluster-id"
# confluent_topic = "your-confluent-topic"
# identity_pool_id = "your-identity-pool-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
            bootstrap_server=bootstrap_server,
            cluster_id=cluster_id,
            topic=confluent_topic,
            identity_pool_id=identity_pool_id,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Ruby Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

# topic_id = "your-topic-id"
# bootstrap_server = "bootstrap-server-id.us-south1.gcp.confluent.cloud:9092"
# cluster_id = "cluster-id"
# confluent_topic = "confluent-topic-name"
# identity_pool_id = "identity-pool-id"
# gcp_service_account = "service-account@project.iam.gserviceaccount.com"

pubsub = Google::Cloud::Pubsub.new
topic_admin = pubsub.topic_admin

topic = topic_admin.create_topic name: pubsub.topic_path(topic_id),
                                 ingestion_data_source_settings: {
                                   confluent_cloud: {
                                     bootstrap_server: bootstrap_server,
                                     cluster_id: cluster_id,
                                     topic: confluent_topic,
                                     identity_pool_id: identity_pool_id,
                                     gcp_service_account: gcp_service_account
                                   }
                                 }

puts "Topic with Confluent Cloud Ingestion #{topic.name} created."

Siguientes pasos

Para buscar y filtrar ejemplos de código de otros Google Cloud productos, consulta el Google Cloud navegador de ejemplos.