建立主題

在 Pub/Sub 中,主題是代表訊息來源的具名資源。您必須先建立主題,然後才能發布或訂閱該主題。Pub/Sub 支援兩種主題:標準主題和匯入主題。

本文說明如何建立 Pub/Sub 標準主題。如要進一步瞭解匯入主題和如何建立匯入主題,請參閱「關於匯入主題」。

如要建立主題,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API。

事前準備

必要角色和權限

如要取得建立主題所需的權限,請要求管理員為您授予專案的 Pub/Sub 編輯者(roles/pubsub.editor) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備建立主題所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要建立主題,您必須具備下列權限:

  • 授予這項權限,即可在專案中建立主題: pubsub.topics.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。您可以在一個專案中建立訂閱項目,並將其附加至位於其他專案的主題。請確認您具備每個專案的必要權限。

主題的屬性

建立或更新主題時,必須指定主題的屬性。

新增預設訂閱項目

Adds a default subscription to the Pub/Sub topic. 主題建立後,您可以為該主題建立其他訂閱項目。 預設訂閱方案具有下列屬性:

  • -sub」的訂閱 ID
  • 提取傳送類型
  • 訊息保留時間為七天
  • 持續處於非活躍狀態達 31 天即失效
  • 確認期限為 10 秒
  • 立即重試政策

使用結構定義

結構定義是訊息資料欄位必須遵循的格式。結構定義是發布者和訂閱者之間的合約,Pub/Sub 會強制執行。主題結構定義有助於標準化訊息類型和權限,讓機構中的不同團隊都能使用這些訊息。Pub/Sub 會為訊息類型和權限建立中央授權單位。如要使用結構定義建立主題,請參閱結構定義總覽

啟用擷取功能

啟用這項屬性後,您就能將外部來源的串流資料擷取至主題,以便使用 Google Cloud的功能。如要建立匯入主題以供擷取,請參閱下列內容:

啟用訊息保留功能

指定 Pub/Sub 主題在發布後保留訊息的時間長度。訊息保留期限過後,不論確認狀態為何,Pub/Sub 都可能會捨棄訊息。系統會儲存發布至主題的所有訊息,因此會產生訊息儲存費用

  • 預設值 = 未啟用
  • 最小值 = 10 分鐘
  • 最大值 = 31 天

將訊息資料匯出至 BigQuery

啟用這項屬性後,您就能建立 BigQuery 訂閱項目,在收到訊息時將訊息寫入現有的 BigQuery 資料表。您不需要設定個別的訂閱用戶端。如要進一步瞭解 BigQuery 訂閱項目,請參閱「BigQuery 訂閱項目」。

將訊息資料備份至 Cloud Storage

啟用這項屬性後,您就能建立 Cloud Storage 訂閱項目,在收到訊息時將訊息寫入現有的 Cloud Storage 表格。您不需要設定個別的訂閱用戶端。如要進一步瞭解 Cloud Storage 訂閱方案,請參閱「Cloud Storage 訂閱方案」。

轉換

主題 SMT 可直接在 Pub/Sub 中輕量修改訊息資料和屬性。這項功能可讓您在訊息發布至主題前,進行資料清理、篩選或格式轉換。

如要進一步瞭解 SMT,請參閱 SMT 總覽

Google-owned and Google-managed encryption key

指出主題是使用Google-owned and Google-managed encryption keys加密。Pub/Sub 預設會使用 Google-owned and Google-managed encryption keys 加密訊息,因此選擇這個選項會維持預設行為。Google 會自動處理金鑰管理和輪替作業,確保訊息一律採用最強大的加密機制保護。這個選項不需要進一步設定。 如要進一步瞭解 Google-owned and Google-managed encryption keys,請參閱使用 Google-owned and Google-managed encryption keys進行預設加密

Cloud KMS 金鑰

指定主題是否使用客戶自行管理的加密金鑰 (CMEK) 加密。Pub/Sub 預設會使用 Google-owned and Google-managed encryption keys 加密訊息。如果您指定這個選項,Pub/Sub 會使用信封加密模式搭配 CMEK。採用這種做法時,Cloud KMS 不會加密訊息。而是加密 Pub/Sub 為每個主題建立的資料加密金鑰 (DEK)。Pub/Sub 會使用為主題產生的最新 DEK 加密訊息。Pub/Sub 會在訊息傳送給訂閱者前不久解密。如要進一步瞭解如何建立金鑰,請參閱「設定訊息加密」。

建立主題

您必須先建立主題,然後才能發布或訂閱該主題。

控制台

如要建立主題,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的 Pub/Sub「建立主題」頁面。

    前往「建立主題」

  2. 在「主題 ID」欄位中,輸入主題的 ID。如要進一步瞭解如何命名主題,請參閱命名規範

  3. 保留「新增預設訂閱項目」選項。

  4. (選用步驟) 請勿選取其他選項。

  5. 按一下「建立主題」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 如要建立主題,請執行 gcloud pubsub topics create 指令:

    gcloud pubsub topics create TOPIC_ID
  3. REST

    如要建立主題,請使用 projects.topics.create 方法:

    要求必須使用 Authorization 標頭中的存取權杖進行驗證。如要取得目前應用程式預設憑證的存取權杖,請執行 gcloud auth application-default print-access-token

    PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
    Authorization: Bearer ACCESS_TOKEN
    

    其中:

    • PROJECT_ID 是您的專案 ID。
    • TOPIC_ID 是主題 ID。

    回應:

    {
    "name": "projects/PROJECT_ID/topics/TOPIC_ID"
    }

    C++

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

    namespace pubsub = ::google::cloud::pubsub;
    namespace pubsub_admin = ::google::cloud::pubsub_admin;
    [](pubsub_admin::TopicAdminClient client, std::string project_id,
       std::string topic_id) {
      auto topic = client.CreateTopic(
          pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
      // Note that kAlreadyExists is a possible error when the library retries.
      if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
        std::cout << "The topic already exists\n";
        return;
      }
      if (!topic) throw std::move(topic).status();
    
      std::cout << "The topic was successfully created: " << topic->DebugString()
                << "\n";
    }

    C#

    在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件

    
    using Google.Cloud.PubSub.V1;
    using Grpc.Core;
    using System;
    
    public class CreateTopicSample
    {
        public Topic CreateTopic(string projectId, string topicId)
        {
            PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
            var topicName = TopicName.FromProjectTopic(projectId, topicId);
            Topic topic = null;
    
            try
            {
                topic = publisher.CreateTopic(topicName);
                Console.WriteLine($"Topic {topic.Name} created.");
            }
            catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
            {
                Console.WriteLine($"Topic {topicName} already exists.");
            }
            return topic;
        }
    }

    Go

    以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    func create(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	topic := &pubsubpb.Topic{
    		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    	}
    	t, err := client.TopicAdminClient.CreateTopic(ctx, topic)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    	fmt.Fprintf(w, "Topic created: %v\n", t)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

    
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicExample(projectId, topicId);
      }
    
      public static void createTopicExample(String projectId, String topicId) throws IOException {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          TopicName topicName = TopicName.of(projectId, topicId);
          Topic topic = topicAdminClient.createTopic(topicName);
          System.out.println("Created topic: " + topic.getName());
        }
      }
    }

    Node.js

    在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    Node.ts

    在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub} from '@google-cloud/pubsub';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId: string) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    PHP

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。 詳情請參閱 Pub/Sub PHP API 參考說明文件

    use Google\Cloud\PubSub\PubSubClient;
    
    /**
     * Creates a Pub/Sub topic.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     */
    function create_topic($projectId, $topicName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->createTopic($topicName);
    
        printf('Topic created: %s' . PHP_EOL, $topic->name());
    }

    Python

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

    from google.cloud import pubsub_v1
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    topic = publisher.create_topic(request={"name": topic_path})
    
    print(f"Created topic: {topic.name}")

    Ruby

    以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

    在試用這個範例之前,請先按照快速入門:使用用戶端程式庫的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

    # topic_id = "your-topic-id"
    
    pubsub = Google::Cloud::PubSub.new
    
    topic_admin = pubsub.topic_admin
    
    topic = topic_admin.create_topic name: pubsub.topic_path(topic_id)
    
    puts "Topic #{topic.name} created."

機構政策限制

機構政策可以限制主題建立作業,例如,政策可以限制訊息儲存在 Compute Engine 區域。為避免建立主題時發生錯誤,請先檢查並更新機構政策 (如有需要),再建立主題。

如果專案是新建立的,請稍候幾分鐘,等機構政策服務初始化後再建立主題。

前往機構政策

詳情請參閱「設定訊息儲存空間政策」。

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。