Mengubah jenis topik

Anda dapat mengonversi topik impor menjadi topik standar atau sebaliknya, topik standar menjadi topik impor.

Mengonversi topik impor menjadi topik standar

Untuk mengonversi topik impor menjadi topik standar, hapus setelan penyerapan. Lakukan langkah-langkah berikut:

Konsol

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka Topik

  2. Klik topik impor.

  3. Di halaman detail topik, klik Edit.

  4. Hapus centang pada opsi Aktifkan penyerapan.

  5. Klik Perbarui.

gcloud

  1. Di konsol Google Cloud, aktifkan Cloud Shell.

    Aktifkan Cloud Shell

    Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.

  2. Jalankan perintah gcloud pubsub topics update:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Ganti TOPIC_ID dengan ID topik.

Mengonversi topik standar ke topik impor Amazon Kinesis Data Streams

Untuk mengonversi topik standar ke topik impor Amazon Kinesis Data Streams, periksa terlebih dahulu apakah Anda memenuhi semua prasyarat.

Konsol

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka Topik

  2. Klik topik yang ingin Anda konversi menjadi topik impor.

  3. Di halaman detail topik, klik Edit.

  4. Pilih opsi Aktifkan penyerapan.

  5. Untuk sumber penyerapan, pilih Amazon Kinesis Data Streams.

  6. Masukkan detail berikut:

    • ARN Aliran Data Kinesis: ARN untuk Aliran Data Kinesis yang Anda rencanakan untuk diserap ke Pub/Sub. Format ARN adalah sebagai berikut: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN Konsumen Kinesis: ARN resource konsumen yang terdaftar ke AWS Kinesis Data Stream. Format ARN adalah sebagai berikut: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • AWS Role ARN: ARN peran AWS. Format ARN peran adalah sebagai berikut: arn:aws:iam:${Account}:role/${RoleName}.

    • Akun layanan: Akun layanan yang Anda buat di Membuat akun layanan di Google Cloud.

  7. Klik Perbarui.

gcloud

  1. Di konsol Google Cloud, aktifkan Cloud Shell.

    Aktifkan Cloud Shell

    Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.

  2. Jalankan perintah gcloud pubsub topics update dengan semua flag yang disebutkan dalam contoh berikut:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Ganti kode berikut:

    • TOPIC_ID adalah ID atau nama topik. Kolom ini tidak dapat diperbarui.

    • KINESIS_STREAM_ARN adalah ARN untuk Kinesis Data Streams yang ingin Anda serap ke Pub/Sub. Format ARN adalah sebagai berikut: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN adalah ARN resource konsumen yang terdaftar ke AWS Kinesis Data Streams. Format ARN adalah sebagai berikut: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN adalah ARN peran AWS. Format ARN peran adalah sebagai berikut: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT adalah akun layanan yang Anda buat di Membuat akun layanan di Google Cloud.

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Python Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C++ Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Untuk mengetahui informasi selengkapnya tentang ARN, lihat Amazon Resource Name (ARN) dan ID IAM.

Mengonversi topik standar menjadi topik impor Cloud Storage

Untuk mengonversi topik standar menjadi topik impor Cloud Storage, periksa terlebih dahulu apakah Anda memenuhi semua prasyarat.

Konsol

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka Topik

  2. Klik topik yang ingin Anda konversi menjadi topik impor Cloud Storage.

  3. Di halaman detail topik, klik Edit.

  4. Pilih opsi Aktifkan penyerapan.

  5. Untuk sumber transfer, pilih Google Cloud Storage.

  6. Untuk bucket Cloud Storage, klik Browse.

    Halaman Select bucket akan terbuka. Pilih salah satu opsi berikut:

    • Pilih bucket yang ada dari project yang sesuai.

    • Klik ikon buat dan ikuti petunjuk di layar untuk membuat bucket baru. Setelah membuat bucket, pilih bucket untuk topik impor Cloud Storage.

  7. Saat Anda menentukan bucket, Pub/Sub akan memeriksa izin yang sesuai di bucket untuk akun layanan Pub/Sub. Jika ada masalah izin, Anda akan melihat pesan error terkait izin.

    Jika Anda mengalami masalah izin, klik Set permissions. Untuk informasi selengkapnya, lihat Memberikan izin Cloud Storage ke akun layanan Pub/Sub.

  8. Untuk Object format, pilih Text, Avro, atau Pub/Sub Avro.

    Jika memilih Text, Anda dapat menentukan Delimiter secara opsional untuk memisahkan objek menjadi pesan.

    Untuk mengetahui informasi selengkapnya tentang opsi ini, lihat Format input.

  9. Opsional. Anda dapat menentukan Waktu pembuatan objek minimum untuk topik Anda. Jika ditetapkan, hanya objek yang dibuat setelah waktu pembuatan objek minimum yang akan ditransfer.

    Untuk informasi selengkapnya, lihat Waktu pembuatan objek minimum.

  10. Anda harus menentukan Pola glob. Untuk menyerap semua objek dalam bucket, gunakan ** sebagai pola glob. Hanya objek yang cocok dengan pola yang diberikan yang akan diserap.

    Untuk mengetahui informasi selengkapnya, lihat Mencocokkan pola glob.

  11. Pertahankan setelan default lainnya.
  12. Klik Perbarui topik.

gcloud

  1. Di konsol Google Cloud, aktifkan Cloud Shell.

    Aktifkan Cloud Shell

    Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.

  2. Agar tidak kehilangan setelan untuk topik impor, pastikan untuk menyertakan semuanya setiap kali Anda memperbarui topik. Jika Anda mengabaikan sesuatu, Pub/Sub akan mereset setelan ke nilai default aslinya.

    Jalankan perintah gcloud pubsub topics update dengan semua flag yang disebutkan dalam contoh berikut:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Ganti kode berikut:

    • TOPIC_ID adalah ID atau nama topik. Kolom ini tidak dapat diperbarui.

    • BUCKET_NAME: Menentukan nama bucket yang ada. Contoh, prod_bucket. Nama bucket tidak boleh menyertakan project ID. Untuk membuat bucket, lihat Membuat bucket.

    • INPUT_FORMAT: Menentukan format objek yang ditransfer. Ini dapat berupa text, avro, atau pubsub_avro. Untuk mengetahui informasi selengkapnya tentang opsi ini, lihat Format input.

    • TEXT_DELIMITER: Menentukan pemisah untuk memisahkan objek teks menjadi pesan Pub/Sub. Kolom ini harus berupa satu karakter dan hanya boleh ditetapkan jika INPUT_FORMAT adalah text. Nilai defaultnya adalah karakter baris baru (\n).

      Saat menggunakan gcloud CLI untuk menentukan pemisah, perhatikan dengan cermat penanganan karakter khusus seperti baris baru \n. Gunakan format '\n' untuk memastikan pemisah ditafsirkan dengan benar. Cukup menggunakan \n tanpa tanda kutip atau hasil escape akan menghasilkan pemisah "n".

    • MINIMUM_OBJECT_CREATE_TIME: Menentukan waktu minimum saat objek dibuat agar dapat diserap. Format ini harus dalam UTC dalam format YYYY-MM-DDThh:mm:ssZ. Misalnya, 2024-10-14T08:30:30Z.

      Tanggal apa pun, baik masa lalu maupun masa mendatang, dari 0001-01-01T00:00:00Z hingga 9999-12-31T23:59:59Z inklusif, valid.

    • MATCH_GLOB: Menentukan pola glob yang akan dicocokkan agar objek dapat ditransfer. Saat Anda menggunakan gcloud CLI, glob pencocokan dengan karakter * harus memiliki karakter * yang diformat sebagai di-escape dalam bentuk \*\*.txt atau seluruh glob pencocokan harus dalam tanda kutip "**.txt" atau '**.txt'. Untuk mengetahui informasi tentang sintaksis yang didukung untuk pola glob, lihat dokumentasi Cloud Storage.