将架构与主题相关联

本文档介绍如何关联 Pub/Sub 主题的架构。

准备工作

所需的角色和权限

如需获取关联和管理架构所需的权限,请让管理员授予您项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理访问权限

此预定义角色包含关联和管理架构所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需关联和管理架构,您需要具备以下权限:

  • 创建架构: pubsub.schemas.create
  • 将架构附加到主题: pubsub.schemas.attach
  • 提交架构修订版本: pubsub.schemas.commit
  • 删除架构或架构修订版本: pubsub.schemas.delete
  • 获取架构或架构修订版本: pubsub.schemas.get
  • 列出架构: pubsub.schemas.list
  • 列出架构修订版本: pubsub.schemas.listRevisions
  • 回滚架构: pubsub.schemas.rollback
  • 验证消息: pubsub.schemas.validate
  • 获取架构的 IAM 政策: pubsub.schemas.getIamPolicy
  • 为架构配置 IAM 政策 pubsub.schemas.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以向主账号(例如用户、群组、网域或服务帐号)授予角色和权限。您可以在一个项目中创建架构,并将其挂接到位于其他项目中的主题。确保您拥有每个项目所需的权限。

将架构与主题相关联的准则

您可以在创建或修改主题时将架构与主题相关联。 以下是将架构与主题相关联的准则:

  • 您可以将一个架构与一个或多个主题相关联。

    架构与主题关联后,该主题从发布者接收的每条消息都必须遵循该架构。

  • 将架构与主题关联时,您还必须将要发布的消息的编码指定为 BINARYJSON。如果将 JSON 与 Avro 架构一起使用,请密切注意联合的编码规则

  • 如果与主题关联的架构具有修订版本,则消息必须与编码匹配,并根据可用范围内的修订版本进行验证。如果不通过验证,消息就无法发布。

    系统会根据创建时间按时间倒序尝试修订版本。如需创建架构修订版本,请参阅提交架构修订版本

消息架构的验证逻辑

将架构与主题关联且架构具有修订版本时,您可以指定要使用的修订版本的子集。如果您未指定范围,则整个范围将用于验证。

如果您未将某个修订版本指定为允许的第一个修订版本,则系统会使用架构的最早的现有修订版本进行验证。如果您未将某个修订版本指定为允许的最后一个修订版本,则系统会使用架构的最新现有修订版本。

我们来看一下附加到主题 T 的架构 S 的示例。

架构 S 具有按顺序创建的修订版本 ID ABCD,其中 A 是第一个或最早的修订版本。所有架构都不完全相同,也不会出现现有架构的回滚。

  • 如果您只将允许的第一个修订版本字段设置为 B,则仅符合架构 A 的消息将被拒绝,而符合架构 BCD 的消息会被接受。

  • 如果您只将允许的最新修订版本字段设置为 C,则系统将接受符合架构 ABC 的消息,而仅符合架构 D 的消息将被拒绝。

  • 如果您将允许的第一个修订版本这两个字段设置为 B,将允许的最后一个修订版本这两个字段设置为 C,则系统会接受符合架构 BC 的消息。

  • 您还可以将第一个修订版本和最后一个修订版本设置为相同的修订版本 ID。在这种情况下,只有符合该修订版本的消息才会被接受。

创建主题时创建并关联架构

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库创建具有架构的主题。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 主题页面。

    打开“主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入主题 ID。

    如要为主题命名,请参阅相关指南

  4. 选中使用架构复选框。

    保留其余字段的默认设置。

    您可以创建架构或使用现有架构。

  5. 如果您要创建架构,请按以下步骤操作: `

    1. 选择 Pub/Sub 架构部分,选择创建新架构

    辅助标签页中会显示创建架构页面。

    按照创建架构中的步骤操作。

    1. 返回创建主题标签页,然后点击刷新

    2. 选择 Pub/Sub 架构字段中搜索您的架构。

    3. 选择消息编码方式:JSON二进制

    您刚刚创建的架构有一个修订版本 ID。您可以按照提交架构修订版本中的说明创建其他架构修订版本。

  6. 如果要关联已创建的架构,请按以下步骤操作:

    1. 选择 Pub/Sub 架构部分,选择一个现有架构。

    2. 选择消息编码方式:JSON二进制

  7. 可选:如果所选架构具有修订版本,请在修订版本范围部分,使用允许的第一个修订版本允许的最后一个修订版本下拉菜单。

您可以指定两个字段、仅指定一个字段,也可以根据您的要求保留默认设置。

  1. 保留其余字段的默认设置。

  2. 点击创建以保存主题,并将其分配给所选架构。

gcloud

如需创建分配有先前创建的架构的主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

其中:

  • TOPIC_ID 是您要创建的主题的 ID。
  • ENCODING_TYPE 是针对架构验证的消息的编码。该值必须设置为 JSONBINARY
  • SCHEMA_ID 是现有架构的 ID。
  • FIRST_REVISION_ID 是要验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是要进行验证的最新修订版本的 ID。

--first-revision-id--last-revision-id 都是可选的。

您也可以从其他 Google Cloud 项目分配架构:

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --schema-project=SCHEMA_PROJECT \
        --project=TOPIC_PROJECT

其中:

  • SCHEMA_PROJECT 是架构的 Google Cloud 项目的项目 ID。
  • TOPIC_PROJECT 是主题的 Google Cloud 项目的项目 ID。

REST

如需创建主题,请使用 projects.topics.create 方法:

请求:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

请求正文:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。
  • SCHEMA_NAME 是应该根据其验证发布消息的架构的名称。其格式为:projects/PROJECT_ID/schemas/SCHEMA_ID
  • ENCODING_TYPE 是根据架构验证过的消息的编码。其必须设置为 JSONBINARY
  • FIRST_REVISION_ID 是要验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是要进行验证的最新修订版本的 ID。

firstRevisionIdlastRevisionId 都是可选的。

响应:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

如果请求中未提供 firstRevisionIdlastRevisionId,则会被省略。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string schema_id, std::string const& encoding) {
  google::pubsub::v1::Topic request;
  request.set_name(pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.mutable_schema_settings()->set_schema(
      pubsub::Schema(std::move(project_id), std::move(schema_id)).FullName());
  request.mutable_schema_settings()->set_encoding(
      encoding == "JSON" ? google::pubsub::v1::JSON
                         : google::pubsub::v1::BINARY);
  auto topic = client.CreateTopic(request);

  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;

public class CreateTopicWithSchemaSample
{
    public Topic CreateTopicWithSchema(string projectId, string topicId, string schemaId, Encoding encoding)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        Topic topic = new Topic
        {
            TopicName = topicName,
            SchemaSettings = new SchemaSettings
            {
                SchemaAsSchemaName = SchemaName.FromProjectSchema(projectId, schemaId),
                Encoding = encoding
            }
        };

        Topic receivedTopic = null;
        try
        {
            receivedTopic = publisher.CreateTopic(topic);
            Console.WriteLine($"Topic {topic.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Topic {topicName} already exists.");
        }
        return receivedTopic;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithSchemaRevisions(w io.Writer, projectID, topicID, schemaID, firstRevisionID, lastRevisionID string, encoding pubsub.SchemaEncoding) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// schemaID := "my-schema-id"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	// encoding := pubsub.EncodingJSON
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	tc := &pubsub.TopicConfig{
		SchemaSettings: &pubsub.SchemaSettings{
			Schema:          fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
			FirstRevisionID: firstRevisionID,
			LastRevisionID:  lastRevisionID,
			Encoding:        encoding,
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, tc)
	if err != nil {
		return fmt.Errorf("CreateTopicWithConfig: %w", err)
	}
	fmt.Fprintf(w, "Created topic with schema revision: %#v\n", t)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Use an existing schema.
    String schemaId = "your-schema-id";
    // Choose either BINARY or JSON message serialization in this topic.
    Encoding encoding = Encoding.BINARY;
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    createTopicWithSchemaRevisionsExample(
        projectId, topicId, schemaId, firstRevisionId, lastRevisionId, encoding);
  }

  public static void createTopicWithSchemaRevisionsExample(
      String projectId,
      String topicId,
      String schemaId,
      String firstRevisionid,
      String lastRevisionId,
      Encoding encoding)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    SchemaSettings schemaSettings =
        SchemaSettings.newBuilder()
            .setSchema(schemaName.toString())
            .setFirstRevisionId(firstRevisionid)
            .setLastRevisionId(lastRevisionId)
            .setEncoding(encoding)
            .build();

    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setSchemaSettings(schemaSettings)
                  .build());

      System.out.println("Created topic with schema: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId,
  schemaNameOrId,
  encodingType
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId: string,
  schemaNameOrId: string,
  encodingType: 'BINARY' | 'JSON'
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\Schema;

/**
 * Create a topic with a schema.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $schemaId
 * @param string $encoding
 */
function create_topic_with_schema($projectId, $topicId, $schemaId, $encoding)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);

    $topic = $pubsub->createTopic($topicId, [
        'schemaSettings' => [
            // The schema may be provided as an instance of the schema type,
            // or by using the schema ID directly.
            'schema' => $schema,
            // Encoding may be either `BINARY` or `JSON`.
            // Provide a string or a constant from Google\Cloud\PubSub\V1\Encoding.
            'encoding' => $encoding,
        ]
    ]);

    printf('Topic %s created', $topic->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import AlreadyExists, InvalidArgument
from google.cloud.pubsub import PublisherClient, SchemaServiceClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = "BINARY"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

if message_encoding == "BINARY":
    encoding = Encoding.BINARY
elif message_encoding == "JSON":
    encoding = Encoding.JSON
else:
    encoding = Encoding.ENCODING_UNSPECIFIED

try:
    response = publisher_client.create_topic(
        request={
            "name": topic_path,
            "schema_settings": {
                "schema": schema_path,
                "encoding": encoding,
                "first_revision_id": first_revision_id,
                "last_revision_id": last_revision_id,
            },
        }
    )
    print(f"Created a topic:\n{response}")

except AlreadyExists:
    print(f"{topic_id} already exists.")
except InvalidArgument:
    print("Please choose either BINARY or JSON as a valid message encoding type.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = :binary

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.create_topic topic_id, schema_name: schema_id, message_encoding: message_encoding

puts "Topic #{topic.name} created."

修改与主题关联的架构

您可以修改主题,以附加架构、移除架构或更新用于验证消息的修订版本范围。一般来说,如果您计划对正在使用的架构进行更改,则可以提交新的修订版本,并更新用于主题的修订版本范围。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库修改与主题关联的架构。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 主题页面。

    打开“主题”

  2. 点击相应主题的主题 ID

  3. 在主题详情页面中,点击修改

  4. 您可以对架构进行以下更改。

    更改可能需要几分钟的时间才会生效。

    • 如果要从主题中移除架构,请在修改主题页面中,取消选中使用架构复选框。

    • 如果要更改架构,请在 Schema 部分中选择架构名称。

    根据需要更新其他字段。

    • 对于修订版本范围,如果您要更新修订版本范围,请使用允许的第一个修订版本允许的最后一个修订版本下拉菜单。

    您可以指定两个字段、仅指定一个字段,也可以根据您的要求保留默认设置。

  5. 点击更新以保存更改。

gcloud

gcloud pubsub topics update TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_NAME \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

其中:

  • TOPIC_ID 是您要创建的主题的 ID。
  • ENCODING_TYPE 是针对架构验证的消息的编码。该值必须设置为 JSONBINARY
  • SCHEMA_NAME 是现有架构的名称。
  • FIRST_REVISION_ID 是要验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是要进行验证的最新修订版本的 ID。

--first-revision-id--last-revision-id 都是可选的。

REST

如需更新主题,请使用 projects.topics.patch 方法:

请求:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PATCH https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

请求正文:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
    "update_mask":
  }
}

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。
  • SCHEMA_NAME 是应该根据其验证发布消息的架构的名称。其格式为:projects/PROJECT_ID/schemas/SCHEMA_ID
  • ENCODING_TYPE 是根据架构验证过的消息的编码。其必须设置为 JSONBINARY
  • FIRST_REVISION_ID 是要验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是要进行验证的最新修订版本的 ID。

firstRevisionIdlastRevisionId 都是可选的。

响应:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

更新后,firstRevisionIdlastRevisionId 均不会设置。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& first_revision_id,
   std::string const& last_revision_id) {
  google::pubsub::v1::UpdateTopicRequest request;
  auto* request_topic = request.mutable_topic();
  request_topic->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  request_topic->mutable_schema_settings()->set_first_revision_id(
      first_revision_id);
  request_topic->mutable_schema_settings()->set_last_revision_id(
      last_revision_id);
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.first_revision_id";
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.last_revision_id";
  auto topic = client.UpdateTopic(request);

  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicSchema(w io.Writer, projectID, topicID, firstRevisionID, lastRevisionID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	t := client.Topic(topicID)

	// This updates the first / last revision ID for the topic's schema.
	// To clear the schema entirely, use a zero valued (empty) SchemaSettings.
	tc := pubsub.TopicConfigToUpdate{
		SchemaSettings: &pubsub.SchemaSettings{
			FirstRevisionID: firstRevisionID,
			LastRevisionID:  lastRevisionID,
		},
	}

	gotTopicCfg, err := t.Update(ctx, tc)
	if err != nil {
		fmt.Fprintf(w, "topic.Update err: %v\n", gotTopicCfg)
		return err
	}
	fmt.Fprintf(w, "Updated topic with schema: %#v\n", gotTopicCfg)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicSchemaExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing topic that has schema settings attached to it.
    String topicId = "your-topic-id";
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    UpdateTopicSchemaExample.updateTopicSchemaExample(
        projectId, topicId, firstRevisionId, lastRevisionId);
  }

  public static void updateTopicSchemaExample(
      String projectId, String topicId, String firstRevisionid, String lastRevisionId)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the schema settings with the changes you want to make.
      SchemaSettings schemaSettings =
          SchemaSettings.newBuilder()
              .setFirstRevisionId(firstRevisionid)
              .setLastRevisionId(lastRevisionId)
              .build();

      // Construct the topic with the schema settings you want to change.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setSchemaSettings(schemaSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder()
              .addPaths("schema_settings.first_revision_id")
              .addPaths("schema_settings.last_revision_id")
              .build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println("Updated topic with schema: " + topic.getName());
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import InvalidArgument, NotFound
from google.cloud.pubsub import PublisherClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    response = publisher_client.update_topic(
        request={
            "topic": {
                "name": topic_path,
                "schema_settings": {
                    "first_revision_id": first_revision_id,
                    "last_revision_id": last_revision_id,
                },
            },
            "update_mask": "schemaSettings.firstRevisionId,schemaSettings.lastRevisionId",
        }
    )
    print(f"Updated a topic schema:\n{response}")

except NotFound:
    print(f"{topic_id} not found.")
except InvalidArgument:
    print("Schema settings are not valid.")
0

后续步骤