Membuat topik impor Cloud Storage

Topik impor Cloud Storage memungkinkan Anda terus menyerap data dari Cloud Storage ke Pub/Sub. Kemudian, Anda dapat melakukan streaming data ke salah satu tujuan yang didukung Pub/Sub. Pub/Sub secara otomatis mendeteksi objek baru yang ditambahkan ke bucket Cloud Storage dan menyerapnya.

Cloud Storage adalah layanan untuk menyimpan objek di Google Cloud. Objek adalah bagian data yang tidak dapat diubah dan terdiri dari file dalam format apa pun. Anda menyimpan objek di penampung yang disebut bucket. Bucket juga dapat berisi folder terkelola, yang Anda gunakan untuk memberikan akses yang diperluas ke grup objek dengan awalan nama bersama.

Untuk mengetahui informasi selengkapnya tentang Cloud Storage, lihat dokumentasi Cloud Storage.

Untuk mengetahui informasi selengkapnya tentang topik impor, lihat Tentang topik impor.

Sebelum memulai

Peran dan izin yang diperlukan untuk mengelola topik impor Cloud Storage

Untuk mendapatkan izin yang Anda perlukan guna membuat dan mengelola topik impor Cloud Storage, minta administrator untuk memberi Anda peran IAM Pub/Sub Editor (roles/pubsub.editor) di topik atau project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat dan mengelola topik impor Cloud Storage. Untuk melihat izin yang benar-benar diperlukan, luaskan bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat dan mengelola topik impor Cloud Storage:

  • Buat topik impor: pubsub.topics.create
  • Menghapus topik impor: pubsub.topics.delete
  • Mendapatkan topik impor: pubsub.topics.get
  • Cantumkan topik impor: pubsub.topics.list
  • Memublikasikan ke topik impor: pubsub.topics.publish
  • Memperbarui topik impor: pubsub.topics.update
  • Dapatkan kebijakan IAM untuk topik impor: pubsub.topics.getIamPolicy
  • Konfigurasikan kebijakan IAM untuk topik impor: pubsub.topics.setIamPolicy

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Anda dapat mengonfigurasi kontrol akses di tingkat project dan tingkat resource individual.

Kebijakan penyimpanan pesan mematuhi lokasi bucket

Kebijakan penyimpanan pesan topik Pub/Sub harus tumpang-tindih dengan region tempat bucket Cloud Storage Anda berada. Kebijakan ini menentukan tempat Pub/Sub diizinkan untuk menyimpan data pesan Anda.

  • Untuk bucket dengan jenis lokasi sebagai region: Kebijakan harus menyertakan region tertentu tersebut. Misalnya, jika bucket Anda berada di region us-central1, kebijakan penyimpanan pesan juga harus menyertakan us-central1.

  • Untuk bucket dengan jenis lokasi sebagai dual-region atau multi-region: Kebijakan harus menyertakan setidaknya satu region dalam lokasi dual-region atau multi-region. Misalnya, jika bucket Anda berada di US multi-region, kebijakan penyimpanan pesan dapat mencakup us-central1, us-east1, atau region lain dalam US multi-region.

    Jika kebijakan tidak menyertakan region bucket, pembuatan topik akan gagal. Misalnya, jika bucket Anda berada di europe-west1 dan kebijakan penyimpanan pesan Anda hanya menyertakan asia-east1, Anda akan menerima error.

    Jika kebijakan penyimpanan pesan hanya menyertakan satu region yang tumpang-tindih dengan lokasi bucket, redundansi multi-region mungkin akan terganggu. Hal ini karena jika satu region tersebut tidak tersedia, data Anda mungkin tidak dapat diakses. Untuk memastikan redundansi penuh, sebaiknya sertakan setidaknya dua region dalam kebijakan penyimpanan pesan yang merupakan bagian dari lokasi multi-region atau dual-region bucket.

Untuk mengetahui informasi selengkapnya tentang lokasi bucket, lihat dokumentasi.

Menambahkan peran penayang Pub/Sub ke akun layanan Pub/Sub

Anda harus menetapkan peran penayang Pub/Sub ke akun layanan Pub/Sub agar Pub/Sub dapat memublikasikan ke topik impor Cloud Storage.

Mengaktifkan publikasi ke semua topik impor Cloud Storage

Pilih opsi ini jika Anda tidak memiliki topik impor Cloud Storage yang tersedia di project.

  1. Di konsol Google Cloud, buka halaman IAM.

    Buka IAM

  2. Aktifkan opsi Sertakan pemberian peran yang disediakan Google.

  3. Cari akun layanan Pub/Sub yang memiliki format:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. Untuk akun layanan ini, klik tombol Edit Principal.

  5. Jika diperlukan, klik Tambahkan peran lain.

  6. Telusuri dan pilih Pub/Sub publisher role (roles/pubsub.publisher).

  7. Klik Simpan.

Mengaktifkan publikasi ke satu topik impor Cloud Storage

Jika Anda ingin memberikan izin kepada Pub/Sub untuk memublikasikan ke topik impor Cloud Storage tertentu yang sudah ada, ikuti langkah-langkah berikut:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Ganti kode berikut:

    • TOPIC_ID adalah ID atau nama topik impor Cloud Storage.

    • PROJECT_NUMBER adalah nomor project. Untuk melihat nomor project, lihat Mengidentifikasi project.

Menetapkan peran Cloud Storage ke akun layanan Pub/Sub

Untuk membuat topik impor Cloud Storage, akun layanan Pub/Sub harus memiliki izin untuk membaca dari bucket Cloud Storage tertentu. Izin berikut diperlukan:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

Untuk menetapkan izin ini ke akun layanan Pub/Sub, pilih salah satu prosedur berikut:

  • Berikan izin di tingkat bucket. Pada bucket Cloud Storage tertentu, berikan peran Storage Legacy Object Reader (roles/storage.legacyObjectReader) dan Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) ke akun layanan Pub/Sub.

  • Jika harus memberikan peran di tingkat project, Anda dapat memberikan peran Storage Admin (roles/storage.admin) di project yang berisi bucket Cloud Storage. Berikan peran ini ke akun layanan Pub/Sub.

Izin bucket

Lakukan langkah-langkah berikut untuk memberikan peran Storage Legacy Object Reader (roles/storage.legacyObjectReader) dan Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) ke akun layanan Pub/Sub di tingkat bucket:

  1. Di konsol Google Cloud, buka halaman Cloud Storage.

    Buka Cloud Storage

  2. Klik bucket Cloud Storage tempat Anda ingin membaca pesan dan mengimpor ke topik impor Cloud Storage.

    Halaman Detail bucket akan terbuka.

  3. Di halaman Detail bucket, klik tab Permissions.

  4. Di tab Permissions > View by Principals, klik Grant access.

    Halaman Berikan akses akan terbuka.

  5. Di bagian Add Principals, masukkan nama akun layanan Pub/Sub Anda.

    Format akun layanan adalah service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Misalnya, untuk project dengan PROJECT_NUMBER=112233445566, akun layanan memiliki format service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  6. Di drop-down Assign roles > Select a role, masukkan Object Reader dan pilih peran Storage Legacy Object Reader.

  7. Klik Add another role.

  8. Di drop-down Select a role, masukkan Bucket Reader, lalu pilih peran Storage Legacy Bucket Reader.

  9. Klik Simpan.

Izin project

Lakukan langkah-langkah berikut untuk memberikan peran Storage Admin (roles/storage.admin) di tingkat project:

  1. Di konsol Google Cloud, buka halaman IAM.

    Buka IAM

  2. Di tab Permissions > View by Principals, klik Grant access.

    Halaman Berikan akses akan terbuka.

  3. Di bagian Add Principals, masukkan nama akun layanan Pub/Sub Anda.

    Format akun layanan adalah service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Misalnya, untuk project dengan PROJECT_NUMBER=112233445566, akun layanan memiliki format service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Di drop-down Assign roles > Select a role, masukkan Storage Admin, lalu pilih peran Storage Admin.

  5. Klik Simpan.

Untuk informasi selengkapnya tentang IAM Cloud Storage, lihat Cloud Storage Identity and Access Management.

Properti topik impor Cloud Storage

Untuk mengetahui informasi selengkapnya tentang properti umum di semua topik, lihat Properti topik.

Nama bucket

Ini adalah nama bucket Cloud Storage tempat Pub/Sub membaca data yang dipublikasikan ke topik impor Cloud Storage.

Format input

Saat membuat topik impor Cloud Storage, Anda dapat menentukan format objek yang akan diserap sebagai Teks, Avro, atau Avro Pub/Sub.

  • Teks. Objek diasumsikan menyimpan data dengan teks biasa. Format input ini mencoba menyerap semua objek di bucket selama objek tersebut memenuhi waktu pembuatan objek minimum dan cocok dengan kriteria pola glob.

    Pemisah. Anda juga dapat menentukan pemisah yang digunakan untuk memisahkan objek menjadi pesan. Jika tidak ditetapkan, nilai defaultnya adalah karakter baris baru (\n). Pemisah hanya boleh berupa satu karakter.

  • Avro. Objek dalam format biner Apache Avro. Objek apa pun yang tidak dalam format Apache Avro yang valid tidak akan diserap. Berikut adalah batasan terkait Avro:

    • Avro versi 1.1.0 dan 1.2.0 tidak didukung.
    • Ukuran maksimum blok Avro adalah 16 MB.
  • Pub/Sub Avro. Objek dalam format biner Apache Avro dengan skema yang cocok dengan skema objek yang ditulis ke Cloud Storage menggunakan langganan Cloud Storage Pub/Sub dengan format file Avro. Berikut beberapa panduan penting untuk Avro Pub/Sub:

    • Kolom data data Avro digunakan untuk mengisi kolom data pesan Pub/Sub yang dihasilkan.

    • Jika opsi write_metadata ditentukan untuk langganan Cloud Storage, nilai apa pun di kolom atribut akan diisi sebagai atribut pesan Pub/Sub yang dihasilkan.

    • Jika kunci pengurutan ditentukan dalam pesan asli yang ditulis ke Cloud Storage, kolom ini akan diisi sebagai atribut dengan nama original_message_ordering_key dalam pesan Pub/Sub yang dihasilkan.

Waktu pembuatan objek minimum

Secara opsional, Anda dapat menentukan waktu pembuatan objek minimum saat membuat topik impor Cloud Storage. Hanya objek yang dibuat pada atau setelah stempel waktu ini yang akan ditransfer. Stempel waktu ini harus diberikan dalam format seperti YYYY-MM-DDThh:mm:ssZ. Tanggal apa pun, baik masa lalu maupun mendatang, dari 0001-01-01T00:00:00Z hingga 9999-12-31T23:59:59Z inklusif, valid.

Mencocokkan pola glob

Secara opsional, Anda dapat menentukan pola glob pencocokan saat membuat topik impor Cloud Storage. Hanya objek dengan nama yang cocok dengan pola ini yang akan ditransfer. Misalnya, untuk menyerap semua objek dengan akhiran .txt, Anda dapat menentukan pola glob sebagai **.txt.

Untuk mengetahui informasi tentang sintaksis yang didukung untuk pola glob, lihat dokumentasi Cloud Storage.

Membuat topik impor Cloud Storage

Pastikan Anda telah menyelesaikan prosedur berikut:

Membuat topik dan langganan secara terpisah, meskipun dilakukan secara berurutan, dapat menyebabkan hilangnya data. Ada periode singkat saat topik tersedia tanpa langganan. Jika ada data yang dikirim ke topik selama waktu ini, data tersebut akan hilang. Dengan membuat topik terlebih dahulu, membuat langganan, lalu mengonversi topik ke topik impor, Anda menjamin bahwa tidak ada pesan yang terlewat selama proses impor.

Untuk membuat topik impor Cloud Storage, ikuti langkah-langkah berikut:

Konsol

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka Topik

  2. Klik Create topic.

    Halaman detail topik akan terbuka.

  3. Di kolom Topic ID, masukkan ID untuk topik impor Cloud Storage Anda.

    Untuk informasi selengkapnya tentang penamaan topik, lihat panduan penamaan.

  4. Pilih Tambahkan langganan default.

  5. Pilih Aktifkan penyerapan.

  6. Untuk sumber transfer, pilih Google Cloud Storage.

  7. Untuk bucket Cloud Storage, klik Browse.

    Halaman Select bucket akan terbuka. Pilih salah satu opsi berikut:

    • Pilih bucket yang ada dari project yang sesuai.

    • Klik ikon buat dan ikuti petunjuk di layar untuk membuat bucket baru. Setelah membuat bucket, pilih bucket untuk topik impor Cloud Storage.

  8. Saat Anda menentukan bucket, Pub/Sub akan memeriksa izin yang sesuai di bucket untuk akun layanan Pub/Sub. Jika ada masalah izin, Anda akan melihat pesan yang mirip dengan berikut ini:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    Jika Anda mengalami masalah izin, klik Set permissions. Untuk informasi selengkapnya, lihat Memberikan izin Cloud Storage ke akun layanan Pub/Sub.

  9. Untuk Object format, pilih Text, Avro, atau Pub/Sub Avro.

    Jika memilih Text, Anda dapat menentukan Delimiter secara opsional untuk memisahkan objek menjadi pesan.

    Untuk mengetahui informasi selengkapnya tentang opsi ini, lihat Format input.

  10. Opsional. Anda dapat menentukan Waktu pembuatan objek minimum untuk topik Anda. Jika ditetapkan, hanya objek yang dibuat setelah waktu pembuatan objek minimum yang akan ditransfer.

    Untuk informasi selengkapnya, lihat Waktu pembuatan objek minimum.

  11. Anda harus menentukan Pola glob. Untuk menyerap semua objek dalam bucket, gunakan ** sebagai pola glob. Jika ditetapkan, hanya objek yang cocok dengan pola yang diberikan yang akan diserap.

    Untuk informasi selengkapnya, lihat Mencocokkan pola glob.

  12. Pertahankan setelan default lainnya.
  13. Klik Create topic.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Dalam perintah, hanya TOPIC_ID, flag --cloud-storage-ingestion-bucket, dan flag --cloud-storage-ingestion-input-format yang diperlukan. Flag lainnya bersifat opsional dan dapat dihilangkan.

    Ganti kode berikut:

    • TOPIC_ID: Nama atau ID topik Anda.

    • BUCKET_NAME: Menentukan nama bucket yang ada. Contoh, prod_bucket. Nama bucket tidak boleh menyertakan project ID. Untuk membuat bucket, lihat Membuat bucket.

    • INPUT_FORMAT: Menentukan format objek yang diserap. Ini dapat berupa text, avro, atau pubsub_avro. Untuk mengetahui informasi selengkapnya tentang opsi ini, lihat Format input.

    • TEXT_DELIMITER: Menentukan pemisah untuk memisahkan objek teks menjadi pesan Pub/Sub. Kolom ini harus berupa satu karakter dan hanya boleh ditetapkan jika INPUT_FORMAT adalah text. Nilai defaultnya adalah karakter baris baru (\n).

      Saat menggunakan gcloud CLI untuk menentukan pemisah, perhatikan dengan cermat penanganan karakter khusus seperti baris baru \n. Gunakan format '\n' untuk memastikan pemisah ditafsirkan dengan benar. Cukup menggunakan \n tanpa tanda kutip atau hasil escape akan menghasilkan pemisah "n".

    • MINIMUM_OBJECT_CREATE_TIME: Menentukan waktu minimum saat objek dibuat agar dapat ditransfer. Format ini harus dalam UTC dalam format YYYY-MM-DDThh:mm:ssZ. Misalnya, 2024-10-14T08:30:30Z.

      Tanggal apa pun, baik masa lalu maupun masa mendatang, dari 0001-01-01T00:00:00Z hingga 9999-12-31T23:59:59Z inklusif, valid.

    • MATCH_GLOB: Menentukan pola glob yang akan dicocokkan agar objek dapat ditransfer. Saat Anda menggunakan gcloud CLI, glob pencocokan dengan karakter * harus memiliki karakter * yang diformat sebagai di-escape dalam bentuk \*\*.txt atau seluruh glob pencocokan harus dalam tanda kutip "**.txt" atau '**.txt'. Untuk mengetahui informasi tentang sintaksis yang didukung untuk pola glob, lihat dokumentasi Cloud Storage.

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Python Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C++ Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Jika Anda mengalami masalah, lihat Memecahkan masalah topik impor Cloud Storage.

Mengedit topik impor Cloud Storage

Anda dapat mengedit topik impor Cloud Storage untuk memperbarui propertinya.

Misalnya, untuk memulai ulang penyerapan, Anda dapat mengubah bucket atau memperbarui waktu pembuatan objek minimum.

Untuk mengedit topik impor Cloud Storage, lakukan langkah-langkah berikut:

Konsol

  1. Di konsol Google Cloud, buka halaman Topics.

    Buka Topik

  2. Klik topik impor Cloud Storage.

  3. Di halaman detail topik, klik Edit.

  4. Perbarui kolom yang ingin diubah.

  5. Klik Perbarui.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Agar tidak kehilangan setelan untuk topik impor, pastikan untuk menyertakan semuanya setiap kali Anda memperbarui topik. Jika Anda mengabaikan sesuatu, Pub/Sub akan mereset setelan ke nilai default aslinya.

    Jalankan perintah gcloud pubsub topics update dengan semua flag yang disebutkan dalam contoh berikut:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Ganti kode berikut:

    • TOPIC_ID adalah ID atau nama topik. Kolom ini tidak dapat diperbarui.

    • BUCKET_NAME: Menentukan nama bucket yang ada. Contoh, prod_bucket. Nama bucket tidak boleh menyertakan project ID. Untuk membuat bucket, lihat Membuat bucket.

    • INPUT_FORMAT: Menentukan format objek yang diserap. Ini dapat berupa text, avro, atau pubsub_avro. Lihat Format input untuk mengetahui informasi selengkapnya tentang opsi ini.

    • TEXT_DELIMITER: Menentukan pemisah untuk memisahkan objek teks menjadi pesan Pub/Sub. Kolom ini harus berupa satu karakter dan hanya boleh ditetapkan jika INPUT_FORMAT adalah text. Nilai defaultnya adalah karakter baris baru (\n).

      Saat menggunakan gcloud CLI untuk menentukan pemisah, perhatikan dengan cermat penanganan karakter khusus seperti baris baru \n. Gunakan format '\n' untuk memastikan pemisah ditafsirkan dengan benar. Cukup menggunakan \n tanpa tanda kutip atau hasil escape akan menghasilkan pemisah "n".

    • MINIMUM_OBJECT_CREATE_TIME: Menentukan waktu minimum saat objek dibuat agar dapat ditransfer. Format ini harus dalam UTC dalam format YYYY-MM-DDThh:mm:ssZ. Misalnya, 2024-10-14T08:30:30Z.

      Tanggal apa pun, baik masa lalu maupun masa mendatang, dari 0001-01-01T00:00:00Z hingga 9999-12-31T23:59:59Z inklusif, valid.

    • MATCH_GLOB: Menentukan pola glob yang akan dicocokkan agar objek dapat ditransfer. Saat Anda menggunakan gcloud CLI, glob pencocokan dengan karakter * harus memiliki karakter * yang diformat sebagai di-escape dalam bentuk \*\*.txt atau seluruh glob pencocokan harus dalam tanda kutip "**.txt" atau '**.txt'. Untuk mengetahui informasi tentang sintaksis yang didukung untuk pola glob, lihat dokumentasi Cloud Storage.

Kuota dan batas untuk topik impor Cloud Storage

Throughput penayang untuk topik impor dibatasi oleh kuota publikasi topik. Untuk mengetahui informasi selengkapnya, lihat Kuota dan batas Pub/Sub.

Langkah selanjutnya