Membuat topik impor Confluent Cloud

Topik impor Confluent Cloud memungkinkan Anda terus menyerap data dari Confluent Cloud sebagai sumber eksternal dan ke Pub/Sub. Kemudian, Anda dapat melakukan streaming data ke salah satu tujuan yang didukung Pub/Sub.

Dokumen ini menunjukkan cara membuat dan mengelola topik impor Confluent Cloud. Untuk membuat topik standar, lihat Membuat topik standar.

Untuk mengetahui informasi selengkapnya tentang topik impor, lihat Tentang topik impor.

Sebelum memulai

Peran dan izin yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk membuat dan mengelola topik impor Confluent Cloud, minta administrator untuk memberi Anda peran IAM Pub/Sub Editor (roles/pubsub.editor) pada topik atau project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat dan mengelola topik impor Confluent Cloud. Untuk melihat izin yang benar-benar diperlukan, luaskan bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat dan mengelola topik impor Confluent Cloud:

  • Buat topik impor: pubsub.topics.create
  • Menghapus topik impor: pubsub.topics.delete
  • Mendapatkan topik impor: pubsub.topics.get
  • Cantumkan topik impor: pubsub.topics.list
  • Memublikasikan ke topik impor: pubsub.topics.publish
  • Memperbarui topik impor: pubsub.topics.update
  • Dapatkan kebijakan IAM untuk topik impor: pubsub.topics.getIamPolicy
  • Konfigurasikan kebijakan IAM untuk topik impor: pubsub.topics.setIamPolicy

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Anda dapat mengonfigurasi kontrol akses di tingkat project dan tingkat resource individual.

Menyiapkan identitas gabungan untuk mengakses Confluent Cloud

Workload Identity Federation memungkinkan Google Cloud layanan mengakses workload yang berjalan di luar Google Cloud. Dengan federasi identitas, Anda tidak perlu mempertahankan atau meneruskan kredensial ke Google Cloud untuk mengakses resource di cloud lain. Sebagai gantinya, Anda dapat menggunakan identitas workload itu sendiri untuk melakukan autentikasi ke Google Cloud dan mengakses resource.

Membuat akun layanan di Google Cloud

Ini langkah opsional. Jika sudah memiliki akun layanan, Anda dapat menggunakannya dalam prosedur ini, bukan membuat akun layanan baru. Jika Anda menggunakan akun layanan yang ada, buka Mencatat ID unik akun layanan untuk langkah berikutnya.

Untuk topik impor Confluent Cloud, Pub/Sub menggunakan akun layanan sebagai identitas untuk mengakses resource dari Confluent Cloud.

Untuk mengetahui informasi selengkapnya tentang cara membuat akun layanan, termasuk prasyarat, peran dan izin yang diperlukan, serta panduan penamaan, lihat Membuat akun layanan. Setelah membuat akun layanan, Anda mungkin perlu menunggu selama 60 detik atau lebih sebelum menggunakan akun layanan. Perilaku ini terjadi karena operasi baca pada akhirnya konsisten; perlu waktu hingga akun layanan baru terlihat.

Catat ID unik akun layanan

Anda memerlukan ID unik akun layanan untuk menyiapkan penyedia identitas dan kumpulan di konsol Confluent Cloud.

  1. Di konsol Google Cloud, buka halaman detail Service account.

    Buka akun layanan

  2. Klik akun layanan yang baru saja Anda buat atau yang ingin Anda gunakan.

  3. Dari halaman Detail akun layanan, catat nomor ID Unik.

    Anda memerlukan ID sebagai bagian dari alur kerja untuk menyiapkan penyedia dan kumpulan identitas di konsol Confluent Cloud.

Menambahkan peran pembuat token akun layanan ke akun layanan Pub/Sub

Peran pembuat token akun layanan (roles/iam.serviceAccountTokenCreator) memungkinkan akun utama membuat kredensial jangka pendek untuk akun layanan. Token atau kredensial ini digunakan untuk meniru identitas akun layanan.

Untuk informasi selengkapnya tentang peniruan akun layanan, lihat Peniruan akun layanan.

Anda juga dapat menambahkan peran penayang Pub/Sub (roles/pubsub.publisher) selama prosedur ini. Untuk mengetahui informasi selengkapnya tentang peran dan alasan Anda menambahkannya, lihat Menambahkan peran penayang Pub/Sub ke akun layanan Pub/Sub.

  1. Di konsol Google Cloud, buka halaman IAM.

    Buka IAM

  2. Klik kotak centang Sertakan pemberian peran yang disediakan Google.

  3. Cari akun layanan yang memiliki format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Untuk akun layanan ini, klik tombol Edit Principal.

  5. Jika diperlukan, klik Tambahkan peran lain.

  6. Telusuri dan klik Peran pembuat token akun layanan (roles/iam.serviceAccountTokenCreator).

  7. Klik Simpan.

Membuat penyedia identitas di Confluent Cloud

Untuk melakukan autentikasi ke Confluent Cloud, Akun Layanan Google Cloud memerlukan kumpulan identitas. Anda harus membuat penyedia identitas di Confluent Cloud terlebih dahulu.

Untuk informasi selengkapnya tentang cara membuat penyedia identitas di Confluent Cloud, buka halaman Menambahkan Penyedia Identitas OAuth/OIDC.

  1. Login ke konsol Confluent Cloud.

  2. Di menu, klik Accounts & Access.

  3. Klik Workload identity.

  4. Klik Tambahkan penyedia.

  5. Klik OAuth/OIDC, lalu klik Next.

  6. Klik Penyedia OIDC Lainnya, lalu klik Berikutnya.

  7. Berikan nama dan deskripsi tujuan penyedia identitas.

  8. Klik Show advanced configuration.

  9. Di kolom Issuer URI, masukkan https://accounts.google.com.

  10. Di kolom JWKS URI, masukkan https://www.googleapis.com/oauth2/v3/certs.

  11. Klik Validasi & Simpan.

Membuat kumpulan identitas dan memberikan peran yang sesuai di Confluent Cloud

Anda harus membuat kumpulan identitas di profil identitas dan memberikan peran yang diperlukan untuk mengizinkan Akun Layanan Pub/Sub melakukan autentikasi dan membaca dari topik Confluent Cloud Kafka.

Pastikan cluster Anda dibuat di Confluent Cloud sebelum melanjutkan pembuatan kumpulan identitas.

Untuk informasi selengkapnya tentang cara membuat kumpulan identitas, buka halaman Menggunakan Kumpulan Identitas dengan Penyedia Identitas OAuth/OIDC Anda.

  1. Login ke konsol Confluent Cloud.

  2. Di menu, klik Accounts & Access.

  3. Klik Workload identity.

  4. Klik penyedia identitas yang Anda buat di Membuat penyedia identitas di Confluent Cloud.

  5. Klik Tambahkan kumpulan.

  6. Berikan nama dan deskripsi untuk kumpulan identitas Anda.

  7. Tetapkan Identity claim ke claims.

  8. Di bagian Tetapkan filter, klik tab Lanjutan. Masukkan kode berikut:

    claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
    

    Ganti <SERVICE_ACCOUNT_UNIQUE_ID> dengan ID unik akun layanan Anda yang ditemukan di Mencatat ID unik akun layanan.

  9. Klik Berikutnya.

  10. Klik Tambahkan izin baru. Kemudian, klik Berikutnya.

  11. Di cluster yang relevan, klik Tambahkan penetapan peran.

  12. Klik peran Operator, lalu klik Tambahkan.

    Peran ini memberikan Pub/Sub. Akses Akun Layanan ke cluster yang berisi topik Confluent Kafka yang ingin Anda serap ke Pub/Sub.

  13. Di bagian cluster, klik Topik. Kemudian, klik Tambahkan penetapan peran.

  14. Pilih peran DeveloperRead.

  15. Klik opsi yang sesuai dan tentukan topik atau awalan. Misalnya, Topik tertentu, Aturan awalan, atau Semua topik.

  16. Klik Tambahkan.

  17. Klik Berikutnya.

  18. Klik Validasi & Simpan.

Menambahkan peran penayang Pub/Sub ke akun utama Pub/Sub

Untuk mengaktifkan publikasi, Anda harus menetapkan peran penayang ke akun layanan Pub/Sub agar Pub/Sub dapat memublikasikan ke topik impor Confluent Cloud.

Mengaktifkan publikasi dari semua topik

Gunakan metode ini jika Anda belum membuat topik impor Confluent Cloud.

  1. Di konsol Google Cloud, buka halaman IAM.

    Buka IAM

  2. Klik kotak centang Sertakan pemberian peran yang disediakan Google.

  3. Cari akun layanan yang memiliki format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Untuk akun layanan ini, klik tombol Edit Principal.

  5. Jika diperlukan, klik Tambahkan peran lain.

  6. Telusuri dan klik Peran penayang Pub/Sub (roles/pubsub.publisher).

  7. Klik Simpan.

Mengaktifkan publikasi dari satu topik

Gunakan metode ini hanya jika topik impor Confluent Cloud sudah ada.

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    Ganti kode berikut:

    • TOPIC_ID: ID topik topik impor Confluent Cloud.

    • PROJECT_NUMBER: nomor project. Untuk melihat nomor project, lihat Mengidentifikasi project.

Menambahkan peran pengguna akun layanan ke akun layanan

Peran Pengguna Akun Layanan (roles/iam.serviceAccountUser) mencakup izin iam.serviceAccounts.actAs yang memungkinkan akun utama melampirkan akun layanan ke setelan penyerapan topik impor Confluent Cloud dan menggunakan akun layanan tersebut untuk identitas gabungan.

  1. Di konsol Google Cloud, buka halaman IAM.

    Buka IAM

  2. Untuk akun utama yang mengeluarkan panggilan pembuatan atau pembaruan topik, klik tombol Edit Principal.

  3. Jika diperlukan, klik Tambahkan peran lain.

  4. Telusuri dan klik Peran pengguna akun layanan (roles/iam.serviceAccountUser).

  5. Klik Simpan.

Menggunakan topik impor Confluent Cloud

Anda dapat membuat topik impor baru atau mengedit topik yang ada.

Pertimbangan

  • Membuat topik dan langganan secara terpisah, meskipun dilakukan secara cepat, dapat menyebabkan hilangnya data. Ada periode singkat saat topik tersedia tanpa langganan. Jika ada data yang dikirim ke topik selama waktu ini, data tersebut akan hilang. Dengan membuat topik terlebih dahulu, membuat langganan, lalu mengonversi topik menjadi topik impor, Anda menjamin bahwa tidak ada pesan yang terlewat selama proses impor.

  • Jika perlu membuat ulang topik Kafka dari topik impor yang ada dengan nama yang sama, Anda tidak dapat hanya menghapus topik Kafka dan membuatnya ulang. Tindakan ini dapat membatalkan validasi pengelolaan offset Pub/Sub, yang dapat menyebabkan hilangnya data. Untuk mengurangi hal ini, ikuti langkah-langkah berikut:

    • Hapus topik impor Pub/Sub.
    • Hapus topik Kafka.
    • Buat topik Kafka.
    • Buat topik impor Pub/Sub.
  • Data dari topik Confluent Cloud Kafka selalu dibaca dari offset paling awal.

Membuat topik impor Confluent Cloud

Untuk mengetahui lebih lanjut properti yang terkait dengan topik, lihat Properti topik.

Pastikan Anda telah menyelesaikan prosedur berikut:

Untuk membuat topik impor Confluent Cloud, ikuti langkah-langkah berikut:

Konsol

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka Topik

  2. Klik Create topic.

  3. Di kolom ID Topik, masukkan ID untuk topik impor Anda.

    Untuk informasi selengkapnya tentang penamaan topik, lihat panduan penamaan.

  4. Pilih Tambahkan langganan default.

  5. Pilih Aktifkan penyerapan.

  6. Untuk sumber transfer, pilih Confluent Cloud.

  7. Masukkan detail berikut:

    1. Server Bootstrap: Server bootstrap cluster Anda yang berisi topik Kafka yang Anda transfer ke Pub/Sub. Formatnya adalah sebagai berikut: hostname:port.

    2. ID Cluster: ID cluster yang berisi topik Kafka yang ditransfer ke Pub/Sub.

    3. Topic: Nama topik Kafka yang Anda serap ke Pub/Sub.

    4. ID kumpulan identitas: ID kumpulan identitas yang digunakan untuk autentikasi dengan Confluent Cloud.

    5. Akun layanan: Akun layanan yang Anda buat di Membuat akun layanan di Google Cloud.

  8. Klik Create topic.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID \
        --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
        --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
        --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
        --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
        --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Ganti kode berikut:

    • TOPIC_ID: nama atau ID topik Pub/Sub Anda.
    • CONFLUENT_BOOTSTRAP_SERVER: server bootstrap cluster Anda yang berisi topik Kafka yang Anda transfer ke Pub/Sub. Formatnya adalah sebagai berikut: hostname:port.
    • CONFLUENT_CLUSTER_ID: ID cluster Anda yang berisi topik Kafka yang Anda transfer ke Pub/Sub.
    • CONFLUENT_TOPIC: nama topik Kafka yang Anda transfer ke Pub/Sub.
    • CONFLUENT_IDENTITY_POOL_ID: ID kumpulan identitas yang digunakan untuk mengautentikasi dengan Confluent Cloud.
    • PUBSUB_SERVICE_ACCOUNT: Akun layanan yang Anda buat di Membuat akun layanan di Google Cloud.

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // Confluent Cloud ingestion settings.
	// bootstrapServer := "bootstrap-server"
	// clusterID := "cluster-id"
	// confluentTopic := "confluent-topic"
	// poolID := "identity-pool-id"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceConfluentCloud{
				BootstrapServer:   bootstrapServer,
				ClusterID:         clusterID,
				Topic:             confluentTopic,
				IdentityPoolID:    poolID,
				GCPServiceAccount: gcpSA,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", t)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Confluent Cloud ingestion settings.
    String bootstrapServer = "bootstrap-server";
    String clusterId = "cluster-id";
    String confluentTopic = "confluent-topic";
    String identityPoolId = "identity-pool-id";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithConfluentCloudIngestionExample(
        projectId,
        topicId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount);
  }

  public static void createTopicWithConfluentCloudIngestionExample(
      String projectId,
      String topicId,
      String bootstrapServer,
      String clusterId,
      String confluentTopic,
      String identityPoolId,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.ConfluentCloud confluentCloud =
          IngestionDataSourceSettings.ConfluentCloud.newBuilder()
              .setBootstrapServer(bootstrapServer)
              .setClusterId(clusterId)
              .setTopic(confluentTopic)
              .setIdentityPoolId(identityPoolId)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId,
  bootstrapServer,
  clusterId,
  confluentTopic,
  identityPoolId,
  gcpServiceAccount
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Python Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bootstrap_server = "your-bootstrap-server"
# cluster_id = "your-cluster-id"
# confluent_topic = "your-confluent-topic"
# identity_pool_id = "your-identity-pool-id"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
            bootstrap_server=bootstrap_server,
            cluster_id=cluster_id,
            topic=confluent_topic,
            identity_pool_id=identity_pool_id,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C++ Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& bootstrap_server,
   std::string const& cluster_id, std::string const& confluent_topic,
   std::string const& identity_pool_id,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                              ->mutable_confluent_cloud();
  confluent_cloud->set_bootstrap_server(bootstrap_server);
  confluent_cloud->set_cluster_id(cluster_id);
  confluent_cloud->set_topic(confluent_topic);
  confluent_cloud->set_identity_pool_id(identity_pool_id);
  confluent_cloud->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
  topicNameOrId: string,
  bootstrapServer: string,
  clusterId: string,
  confluentTopic: string,
  identityPoolId: string,
  gcpServiceAccount: string
) {
  // Creates a new topic with Confluent Cloud ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      confluentCloud: {
        bootstrapServer,
        clusterId,
        topic: confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}

Jika Anda mengalami masalah, lihat Memecahkan masalah topik impor Confluent Cloud.

Mengedit topik impor Confluent Cloud Hubs

Untuk mengedit setelan sumber data penyerapan topik impor Confluent Cloud, ikuti langkah-langkah berikut:

Konsol

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka Topik

  2. Klik topik impor Confluent Cloud.

  3. Di halaman detail topik, klik Edit.

  4. Perbarui kolom yang ingin diubah.

  5. Klik Perbarui.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    Agar tidak kehilangan setelan untuk topik impor, pastikan untuk menyertakan semuanya setiap kali Anda memperbarui topik. Jika Anda tidak memasukkan sesuatu, Pub/Sub akan mereset setelan ke nilai default aslinya.

  2. Jalankan perintah gcloud pubsub topics update dengan semua flag yang disebutkan dalam contoh berikut:

    gcloud pubsub topics update TOPIC_ID \
       --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
       --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
       --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
       --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
       --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Ganti kode berikut:

    • TOPIC_ID: nama atau ID topik Pub/Sub Anda.
    • CONFLUENT_BOOTSTRAP_SERVER: server bootstrap cluster Anda yang berisi topik Kafka yang Anda transfer ke Pub/Sub. Formatnya adalah sebagai berikut: hostname:port.
    • CONFLUENT_CLUSTER_ID: ID cluster Anda yang berisi topik Kafka yang Anda transfer ke Pub/Sub
    • CONFLUENT_TOPIC: Nama topik Kafka yang Anda transfer ke Pub/Sub.
    • CONFLUENT_IDENTITY_POOL_ID: ID kumpulan identitas yang digunakan untuk mengautentikasi dengan Confluent Cloud.
    • CONFLUENT_IDENTITY_POOL_ID: Akun layanan yang Anda buat di Membuat akun layanan di Google Cloud.

Kuota dan batas

Throughput penayang untuk topik impor dibatasi oleh kuota publikasi topik. Untuk mengetahui informasi selengkapnya, lihat Kuota dan batas Pub/Sub.

Langkah berikutnya

Apache Kafka® adalah merek dagang terdaftar dari Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lainnya.