创建和管理架构

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

本页面介绍了如何为 Pub/Sub 主题创建和管理架构。

架构是一种格式 Pub/Sub 消息的 data 字段。架构会在发布者和订阅者之间就消息的格式创建协定。Pub/Sub 强制执行此格式。架构通过为消息类型和权限创建集中授权,为组织中的数据流提供便利。Pub/Sub 消息架构定义了消息中字段的名称和数据类型。

架构类型

您可以使用以下框架之一在 Pub/Sub 中创建架构:

例如,以下架构一次定义仓库的清单,一次是 Avro 格式,另一次是协议缓冲区格式。

Apache Avro 格式

{
 "type" : "record",
 "name" : "Avro",
 "fields" : [
   {
     "name" : "ProductName",
     "type" : "string",
     "default": ""
   },
   {
     "name" : "SKU",
     "type" : "int",
     "default": 0
   },
   {
     "name" : "InStock",
     "type" : "boolean",
     "default": false
   }
 ]
}

协议缓冲区格式

syntax = "proto3";
message ProtocolBuffer {
  string product_name = 1;
  int32 SKU = 2;
  bool in_stock = 3;
}

准备工作

在开始配置架构之前,请确保您符合以下要求:

  • 启用了 Pub/Sub API 的项目。如需启用 Pub/Sub API,请参阅此快速入门

  • Pub/Sub 主题或创建主题所需的角色和权限。

  • 创建架构所需的角色和权限

管理架构所需的角色和权限

如需获取创建和管理架构所需的权限,请让管理员授予您项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理访问权限

此预定义角色包含创建和管理架构所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

  • 创建架构: pubsub.schemas.create
  • 将架构附加到主题: pubsub.schemas.attach
  • 提交架构修订版本: pubsub.schemas.commit
  • 删除架构或架构修订版本: pubsub.schemas.delete
  • 获取架构或架构修订版本: pubsub.schemas.get
  • 列出架构: pubsub.schemas.list
  • 列出架构修订版本: pubsub.schemas.listRevisions
  • 回滚架构: pubsub.schemas.rollback
  • 验证消息: pubsub.schemas.validate
  • 获取架构的 IAM 政策: pubsub.schemas.getIamPolicy
  • 为架构配置 IAM 政策 pubsub.schemas.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以为主帐号(例如用户、群组、网域或服务帐号)授予角色和权限。您可以在一个项目中创建架构,并将其附加到位于另一个项目中的主题。确保您拥有每个项目所需的权限。

创建架构

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库创建架构。以下是创建架构的一些准则:

  • 如需让架构与 Pub/Sub 搭配使用,您只能定义一个顶级类型。不支持引用其他类型的导入语句。

  • 您可以将同一架构与多个主题相关联。

  • 您可以手动测试消息是否针对架构进行验证。

控制台

如需创建架构,请按照以下步骤操作:

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

  2. 点击创建架构

  3. 架构 ID 字段中,输入架构的 ID。

    如需了解如何为架构命名,请参阅为主题、订阅或快照命名的准则

  4. 对于架构类型,请选择 Avro 或协议缓冲区。

    详细了解架构类型

  5. 架构定义字段中,输入架构的 Avro 或协议缓冲区定义。

    例如,以下是 Avro 中的示例架构。

    {
      "type": "record",
      "name": "Avro",
      "fields": [
        {
          "name": "ProductName",
          "type": "string",
          "default": ""
        },
        {
          "name": "SKU",
          "type": "int",
          "default": 0
        },
        {
          "name": "InStock",
          "type": "boolean",
          "default": false
        }
      ]
    }
    
    
  6. 可选:点击验证定义以检查架构定义是否正确。

    验证检查不会检查架构与要发布的消息的兼容性。在下一步中测试消息。

  7. 可选:您可以测试是否发布具有正确架构的消息。

    1. 点击测试消息

    2. 测试消息窗口中,选择一种消息编码类型。

    3. 邮件正文中,输入测试邮件。

    4. 点击测试

      例如,以下是测试架构的示例消息。 在此示例中,选择消息编码作为 JSON

      {"ProductName":"GreenOnions", "SKU":34543, "InStock":true}
      
    5. 退出测试消息页面。

  8. 点击创建以保存架构。

gcloud

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

其中:

  • SCHEMA_TYPEavroprotocol-buffer
  • SCHEMA_DEFINITION 是包含架构定义的 string,系统会根据所选架构类型设置其格式。

您还可以在文件中指定架构定义:

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

其中:

  • SCHEMA_TYPEavroprotocol-buffer
  • SCHEMA_DEFINITION_FILE 是一个 string,它包含带有架构定义的文件的路径,其格式取决于所选的架构类型

REST

要创建架构,请发送如下所示的 POST 请求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

在请求正文中指定以下字段:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
}

其中:

  • SCHEMA_TYPEavroprotocol-buffer
  • SCHEMA_DEFINITION 是包含架构定义的字符串,系统会根据所选架构类型设置其格式。

响应正文应包含架构资源的 JSON 表示法。例如:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

其中:

  • REVISION_ID 是服务器为修订版本生成的 ID。
  • REVISION_CREATE_TIME 是创建修订版本的 ISO 8601 时间戳。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto constexpr kDefinition = R"js({
    "type": "record",
    "name": "State",
    "namespace": "utilities",
    "doc": "A list of states in the United States of America.",
    "fields": [
      {
        "name": "name",
        "type": "string",
        "doc": "The common name of the state."
      },
      {
        "name": "post_abbr",
        "type": "string",
        "doc": "The postal code abbreviation of the state."
      }
    ]
  })js";
  google::pubsub::v1::CreateSchemaRequest request;
  request.set_parent(google::cloud::Project(project_id).FullName());
  request.set_schema_id(schema_id);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(kDefinition);
  auto schema = client.CreateSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto constexpr kDefinition = R"pfile(
      syntax = "proto3";
      package google.cloud.pubsub.samples;

      message State {
        string name = 1;
        string post_abbr = 2;
      }
      )pfile";
  google::pubsub::v1::CreateSchemaRequest request;
  request.set_parent(google::cloud::Project(project_id).FullName());
  request.set_schema_id(schema_id);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(kDefinition);
  auto schema = client.CreateSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();
  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

Avro


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateAvroSchemaSample
{
    public Schema CreateAvroSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            Name = schemaName.ToString(),
            Type = Schema.Types.Type.Avro,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            Parent = "projects/" + projectId,
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Proto


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateProtoSchemaSample
{
    public Schema CreateProtoSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            Name = schemaName.ToString(),
            Type = Schema.Types.Type.ProtocolBuffer,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            Parent = "projects/" + projectId,
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

Avro

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
)

// createAvroSchema creates a schema resource from a JSON-formatted Avro schema file.
func createAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	avscSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaAvro,
		Definition: string(avscSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %v", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
)

// createProtoSchema creates a schema resource from a schema proto file.
func createProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	protoSource, err := os.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaProtocolBuffer,
		Definition: string(protoSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %v", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

Avro


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    createAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema createAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    createProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema createProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
      return null;
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createAvroSchema(schemaNameOrId, avscFile) {
  const definition = fs.readFileSync(avscFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.Avro,
    definition
  );

  const name = await schema.getName();
  console.log(`Schema ${name} created.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createProtoSchema(schemaNameOrId, protoFile) {
  const definition = fs.readFileSync(protoFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.ProtocolBuffer,
    definition
  );

  const fullName = await schema.getName();
  console.log(`Schema ${fullName} created.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

Avro

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Schema\Type;

/**
 * Create a Schema with an AVRO definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $avscFile
 */
function create_avro_schema($projectId, $schemaId, $avscFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = file_get_contents($avscFile);
    $schema = $pubsub->createSchema($schemaId, Type::AVRO, $definition);

    printf('Schema %s created.', $schema->name());
}

Proto

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Schema\Type;

/**
 * Create a Schema with an Protocol Buffer definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $protoFile
 */
function create_proto_schema($projectId, $schemaId, $protoFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = file_get_contents($protoFile);
    $schema = $pubsub->createSchema($schemaId, Type::PROTOCOL_BUFFER, $definition);

    printf('Schema %s created.', $schema->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

Avro

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

project_path = f"projects/{project_id}"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using an Avro schema file:\n{result}")
    return result
except AlreadyExists:
    print(f"{schema_id} already exists.")

Proto

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

project_path = f"projects/{project_id}"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using a protobuf schema file:\n{result}")
    return result
except AlreadyExists:
    print(f"{schema_id} already exists.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

Avro

# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

definition = File.read avsc_file
schema = pubsub.create_schema schema_id, :avro, definition

puts "Schema #{schema.name} created."

Proto

# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

definition = File.read proto_file
schema = pubsub.create_schema schema_id, :protocol_buffer, definition

puts "Schema #{schema.name} created."

创建架构后,您可以在架构页面中查看架构的详细信息。您现在可以将架构与主题相关联

提交架构修订版本

您可以修改 Pub/Sub 中的架构。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库提交架构修订版本。

以下是提交架构修订版本的一些准则:

  • 您可以在特定限制条件内修改架构:

    • 对于协议缓冲区架构,您可以添加或移除可选字段。您无法添加或删除其他字段。也无法修改任何现有字段。

    • 如需了解 Avro 架构,请参阅 Avro 文档,了解有关架构解析的规则。

    • 一个架构最多可以有 20 个修订版本。 如果您超过了上限,请在创建架构修订版本之前将其删除。

  • 每个修订版本都有唯一的关联 ID。修订版本 ID 是自动生成的 8 个字符 UUID。

  • 更新用于主题验证的架构的修订版本范围或修订版本时,更改可能需要几分钟才能生效。

控制台

如需创建架构修订版本,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

  2. 点击现有架构的架构 ID

    系统会打开架构的架构详情页面。

  3. 点击创建修订版本

    系统会打开创建架构修订版本页面。

  4. 根据需要进行更改。

    例如,对于您在创建架构中创建的 Avro 示例架构,您可以添加一个名为 Price 的额外可选字段,如下所示:

     {
       "type": "record",
       "name": "Avro",
       "fields": [
         {
           "name": "ProductName",
           "type": "string",
           "default": ""
         },
         {
           "name": "SKU",
           "type": "int",
           "default": 0
         },
         {
           "name": "InStock",
           "type": "boolean",
           "default": false
         },
         {
           "name": "Price",
           "type": "double",
           "default": "0.0"
         },
       ]
     }
    
  5. 点击验证定义以检查架构定义是否正确。

  6. 您还可以验证架构的消息。

    1. 点击测试消息以测试示例消息。

    2. 测试消息窗口中,选择一种消息编码类型。

    3. 邮件正文中,输入测试邮件。

      例如,以下是测试架构的示例消息。 在此示例中,选择消息编码作为 JSON

      {"ProductName":"GreenOnions", "SKU":34543, "Price":12, "InStock":true}
      
    4. 点击测试

  7. 点击提交以保存架构。

gcloud

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

其中:

  • SCHEMA_TYPEavroprotocol-buffer
  • SCHEMA_DEFINITION 是包含架构定义的 string,系统会根据所选架构类型设置其格式。

您还可以在文件中指定架构定义:

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

其中:

  • SCHEMA_TYPEavroprotocol-buffer
  • SCHEMA_DEFINITION_FILE 是一个 string,它包含带有架构定义的文件的路径,其格式取决于所选的架构类型

REST

如需提交架构修订版本,请发送如下所示的 POST 请求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID:commit
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

在请求正文中指定以下字段:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
  "name": SCHEMA_NAME
}

其中:

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER
  • SCHEMA_DEFINITION 是包含架构定义的字符串,系统会根据所选架构类型设置其格式。
  • SCHEMA_NAME 是现有架构的名称。

响应正文应包含架构资源的 JSON 表示法。例如:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

其中:

  • REVISION_ID 是服务器为修订版本生成的 ID。
  • REVISION_CREATE_TIME 是创建修订版本的 ISO 8601 时间戳。

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

Avro

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"

	"cloud.google.com/go/pubsub"
)

// commitAvroSchema commits a new avro schema revision to an existing schema.
func commitAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema-id"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	// Read an Avro schema file formatted in JSON as a byte slice.
	avscSource, err := ioutil.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	config := pubsub.SchemaConfig{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsub.SchemaAvro,
		Definition: string(avscSource),
	}
	s, err := client.CommitSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CommitSchema: %v", err)
	}
	fmt.Fprintf(w, "Committed a schema using an Avro schema: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"

	"cloud.google.com/go/pubsub"
)

// commitProtoSchema commits a new proto schema revision to an existing schema.
func commitProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	// Read a proto file as a byte slice.
	protoSource, err := ioutil.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	config := pubsub.SchemaConfig{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsub.SchemaProtocolBuffer,
		Definition: string(protoSource),
	}
	s, err := client.CommitSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CommitSchema: %v", err)
	}
	fmt.Fprintf(w, "Committed a schema using a protobuf schema: %#v\n", s)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

Avro


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    commitAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema commitAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build());

      System.out.println("Committed a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    commitProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema commitProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build());

      System.out.println("Committed a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

Avro

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using an Avro schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Proto

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using a protobuf schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

提交架构修订版本后,您可以在架构页面中查看新修订版本的详细信息。

将架构与主题相关联

您可以在创建或修改主题时将架构与主题相关联。以下是将架构与主题相关联的准则:

  • 您可以将架构与一个或多个主题相关联。

    架构与主题关联后,主题从发布者收到的每条消息都必须遵循该架构。

  • 将架构与主题相关联时,您还必须指定以 BINARYJSON 形式发布的消息的编码。

  • 如果与主题关联的架构具有修订版本,消息必须与编码匹配,并针对可用范围内的修订版本进行验证。否则,此消息将无法发布。

    系统会根据创建时间以反向时间顺序尝试修订版本。如需创建架构修订版本,请参阅提交架构修订版本

消息架构的验证逻辑

将架构与主题相关联时,如果架构有修订版本,您可以指定要使用的修订版本子集。如果您未指定范围,则整个范围将用于验证。

如果您未指定允许第一个修订版本的修订版本,则系统会使用架构中最早的修订版本进行验证。如果您未指定允许的最后一个修订版本,则系统会使用架构的最新修订版本。

让我们以附加到 T 主题的架构 S 为例。

架构 S 按顺序创建修订版本 ID ABCD,其中 A 是第一个或最早修订版本。所有架构都不完全相同,也不是现有架构的回滚。

  • 如果仅将允许首次修订字段设置为 B,则仅符合架构 A 的消息将被拒绝,而符合架构 BCD 的消息将被接受。

  • 如果您仅将上次允许修订版本字段设置为 C,则系统将接受符合架构 ABC 的消息,并拒绝仅遵循架构 D 的消息。

  • 如果您将允许第一个修订版本设置为 B 并将上次修订允许设置为 C,则接受符合架构 BC 的消息。

  • 您也可以将第一个修订版本和最后一个修订版本设置为同一修订版本 ID。在这种情况下,我们只接受符合该修订版本的邮件。

创建主题时创建并关联架构

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库创建包含架构的主题。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 主题页面。

    打开“主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入主题 ID。

    如要为主题命名,请参阅准则

  4. 选中使用架构复选框。

    保留其余字段的默认设置。

    您可以创建架构或使用现有架构。

  5. 如果您要创建架构,请按以下步骤操作: `

    1. 对于选择 Pub/Sub 架构,选择创建新架构

    创建架构页面显示在辅助标签页中。

    按照创建架构中的步骤操作。

    1. 返回创建主题标签页,然后点击刷新

    2. 选择 Pub/Sub 架构字段中搜索您的架构。

    3. 选择消息编码:JSON二进制

    您刚刚创建的架构有一个修订版本 ID。您可以按照提交架构修订版本中的说明创建其他架构修订版本。

  6. 如果您要关联已创建的架构,请按以下步骤操作:

    1. 选择 Pub/Sub 架构部分,选择一个现有架构。

    2. 选择消息编码:JSON二进制

  7. 可选:如果所选架构具有修订版本,对于 Revision Range,请使用 First revision allowLast revision allow 的下拉菜单。

您可以指定两个字段,仅指定一个字段,也可以根据自己的要求保留默认设置。

  1. 保留其余字段的默认设置。

  2. 点击创建以保存该主题,并将其分配给所选架构。

gcloud

如需创建使用先前创建的架构分配的主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

其中:

  • TOPIC_ID 是您正在创建的主题的 ID。
  • ENCODING_TYPE 是针对架构验证的消息的编码。该值必须设置为 JSONBINARY
  • SCHEMA_ID 是现有架构的 ID。
  • FIRST_REVISION_ID 是要验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是要验证的最新修订版本的 ID。

--first-revision-id--last-revision-id 都是可选的。

您也可以从其他 Google Cloud 项目分配架构:

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --schema-project=SCHEMA_PROJECT \
        --project=TOPIC_PROJECT

其中:

  • SCHEMA_PROJECT 是架构的 Google Cloud 项目的项目 ID。
  • TOPIC_PROJECT 是主题的 Google Cloud 项目的项目 ID。

REST

如需创建主题,请使用 projects.topics.create 方法:

请求:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

请求正文:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是您的主题 ID。
  • SCHEMA_NAME 是应该根据其验证发布消息的架构的名称。格式为 projects/PROJECT_ID/schemas/SCHEMA_ID
  • ENCODING_TYPE 是根据架构验证过的消息的编码。其必须设置为 JSONBINARY
  • FIRST_REVISION_ID 是要验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是要验证的最新修订版本的 ID。

firstRevisionIdlastRevisionId 都是可选的。

响应:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

如果请求中未提供 firstRevisionIdlastRevisionId,则系统会将其省略。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string schema_id, std::string const& encoding) {
  auto const& schema = pubsub::Schema(project_id, std::move(schema_id));
  auto topic = client.CreateTopic(
      pubsub::TopicBuilder(
          pubsub::Topic(std::move(project_id), std::move(topic_id)))
          .set_schema(schema)
          .set_encoding(encoding == "JSON" ? google::pubsub::v1::JSON
                                           : google::pubsub::v1::BINARY));
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;

public class CreateTopicWithSchemaSample
{
    public Topic CreateTopicWithSchema(string projectId, string topicId, string schemaName, Encoding encoding)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        Topic topic = new Topic
        {
            Name = topicName.ToString(),
            SchemaSettings = new SchemaSettings
            {
                Schema = schemaName,
                Encoding = encoding
            }
        };

        Topic receivedTopic = null;
        try
        {
            receivedTopic = publisher.CreateTopic(topic);
            Console.WriteLine($"Topic {topic.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Topic {topicName} already exists.");
        }
        return receivedTopic;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithSchemaRevisions(w io.Writer, projectID, topicID, schemaID, firstRevisionID, lastRevisionID string, encoding pubsub.SchemaEncoding) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// schemaID := "my-schema-id"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	// encoding := pubsub.EncodingJSON
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	tc := &pubsub.TopicConfig{
		SchemaSettings: &pubsub.SchemaSettings{
			Schema:          fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
			FirstRevisionID: firstRevisionID,
			LastRevisionID:  lastRevisionID,
			Encoding:        encoding,
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, tc)
	if err != nil {
		return fmt.Errorf("CreateTopicWithConfig: %v", err)
	}
	fmt.Fprintf(w, "Created topic with schema revision: %#v\n", t)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Use an existing schema.
    String schemaId = "your-schema-id";
    // Choose either BINARY or JSON message serialization in this topic.
    Encoding encoding = Encoding.BINARY;
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    createTopicWithSchemaRevisionsExample(
        projectId, topicId, schemaId, firstRevisionId, lastRevisionId, encoding);
  }

  public static void createTopicWithSchemaRevisionsExample(
      String projectId,
      String topicId,
      String schemaId,
      String firstRevisionid,
      String lastRevisionId,
      Encoding encoding)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    SchemaSettings schemaSettings =
        SchemaSettings.newBuilder()
            .setSchema(schemaName.toString())
            .setFirstRevisionId(firstRevisionid)
            .setLastRevisionId(lastRevisionId)
            .setEncoding(encoding)
            .build();

    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setSchemaSettings(schemaSettings)
                  .build());

      System.out.println("Created topic with schema: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId,
  schemaNameOrId,
  encodingType
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\Schema;

/**
 * Create a topic with a schema.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $schemaId
 * @param string $encoding
 */
function create_topic_with_schema($projectId, $topicId, $schemaId, $encoding)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);

    $topic = $pubsub->createTopic($topicId, [
        'schemaSettings' => [
            // The schema may be provided as an instance of the schema type,
            // or by using the schema ID directly.
            'schema' => $schema,
            // Encoding may be either `BINARY` or `JSON`.
            // Provide a string or a constant from Google\Cloud\PubSub\V1\Encoding.
            'encoding' => $encoding,
        ]
    ]);

    printf('Topic %s created', $topic->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import AlreadyExists, InvalidArgument
from google.cloud.pubsub import PublisherClient, SchemaServiceClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = "BINARY"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

if message_encoding == "BINARY":
    encoding = Encoding.BINARY
elif message_encoding == "JSON":
    encoding = Encoding.JSON
else:
    encoding = Encoding.ENCODING_UNSPECIFIED

try:
    response = publisher_client.create_topic(
        request={
            "name": topic_path,
            "schema_settings": {
                "schema": schema_path,
                "encoding": encoding,
                "first_revision_id": first_revision_id,
                "last_revision_id": last_revision_id,
            },
        }
    )
    print(f"Created a topic:\n{response}")

except AlreadyExists:
    print(f"{topic_id} already exists.")
except InvalidArgument:
    print("Please choose either BINARY or JSON as a valid message encoding type.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = :binary
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.create_topic topic_id, schema_name: schema_id, message_encoding: message_encoding

puts "Topic #{topic.name} created."

修改与主题关联的架构

您可以修改主题以附加架构、移除架构或更新用于验证消息的修订版本范围。一般来说,如果您计划为使用的架构更改,则可以提交新的修订版本并更新主题的修订版本范围。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库修改与主题关联的架构。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 主题页面。

    打开“主题”

  2. 点击主题的主题 ID

  3. 在主题详情页面中,点击修改

  4. 您可以对架构进行以下更改。

    更改可能需要几分钟时间才能生效。

    • 如果要从主题中移除架构,请在修改主题页面中取消选中使用架构复选框。

    • 如果要更改架构,请在架构部分选择架构的名称。

    根据需要更新其他字段。

    • 如果要更新修订版本范围,对于 Revision Range,请使用 First revision allowLast revision allow 的下拉菜单。

    您可以指定两个字段,仅指定一个字段,也可以根据自己的要求保留默认设置。

  5. 点击更新以保存更改。

gcloud

gcloud pubsub topics update TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_NAME \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

其中:

  • TOPIC_ID 是您正在创建的主题的 ID。
  • ENCODING_TYPE 是针对架构验证的消息的编码。该值必须设置为 JSONBINARY
  • SCHEMA_NAME 是现有架构的名称。
  • FIRST_REVISION_ID 是要验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是要验证的最新修订版本的 ID。

--first-revision-id--last-revision-id 都是可选的。

REST

如需更新主题,请使用 projects.topics.update 方法:

请求:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PATCH https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

请求正文:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
    "update_mask":
  }
}

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是您的主题 ID。
  • SCHEMA_NAME 是应该根据其验证发布消息的架构的名称。格式为 projects/PROJECT_ID/schemas/SCHEMA_ID
  • ENCODING_TYPE 是根据架构验证过的消息的编码。其必须设置为 JSONBINARY
  • FIRST_REVISION_ID 是要验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是要验证的最新修订版本的 ID。

firstRevisionIdlastRevisionId 都是可选的。

响应:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

更新后,系统不会同时设置 firstRevisionIdlastRevisionId

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicSchema(w io.Writer, projectID, topicID, firstRevisionID, lastRevisionID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	t := client.Topic(topicID)

	// This updates the first / last revision ID for the topic's schema.
	// To clear the schema entirely, use a zero valued (empty) SchemaSettings.
	tc := pubsub.TopicConfigToUpdate{
		SchemaSettings: &pubsub.SchemaSettings{
			FirstRevisionID: firstRevisionID,
			LastRevisionID:  lastRevisionID,
		},
	}

	gotTopicCfg, err := t.Update(ctx, tc)
	if err != nil {
		fmt.Fprintf(w, "topic.Update err: %v\n", gotTopicCfg)
		return err
	}
	fmt.Fprintf(w, "Updated topic with schema: %#v\n", gotTopicCfg)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicSchemaExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing topic that has schema settings attached to it.
    String topicId = "your-topic-id";
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    UpdateTopicSchemaExample.updateTopicSchemaExample(
        projectId, topicId, firstRevisionId, lastRevisionId);
  }

  public static void updateTopicSchemaExample(
      String projectId, String topicId, String firstRevisionid, String lastRevisionId)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the schema settings with the changes you want to make.
      SchemaSettings schemaSettings =
          SchemaSettings.newBuilder()
              .setFirstRevisionId(firstRevisionid)
              .setLastRevisionId(lastRevisionId)
              .build();

      // Construct the topic with the schema settings you want to change.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setSchemaSettings(schemaSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder()
              .addPaths("schema_settings.first_revision_id")
              .addPaths("schema_settings.last_revision_id")
              .build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println("Updated topic with schema: " + topic.getName());
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import InvalidArgument, NotFound
from google.cloud.pubsub import PublisherClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    response = publisher_client.update_topic(
        request={
            "topic": {
                "name": topic_path,
                "schema_settings": {
                    "first_revision_id": first_revision_id,
                    "last_revision_id": last_revision_id,
                },
            },
            "update_mask": "schemaSettings.firstRevisionId,schemaSettings.lastRevisionId",
        }
    )
    print(f"Updated a topic schema:\n{response}")

except NotFound:
    print(f"{topic_id} not found.")
except InvalidArgument:
    print("Schema settings are not valid.")

回滚架构修订版本

您可以回滚到特定的架构修订版本。 借助回滚操作,您可以使用与之前的修订版本完全相同的架构定义创建另一个架构修订版本。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库回滚架构。请按照以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

  2. 点击现有架构的名称。

    系统会打开架构的架构详情页面。

  3. 点击回滚

    系统会打开回滚架构对话框。

  4. 选择要将架构回滚到的修订版本。

  5. 点击确认,保存回滚操作。

    系统会使用修订版本处理操作中指定的架构创建新修订版本。

  6. 架构详细信息页面中,选择最新版本的架构和所选版本作为回滚操作的来源。

  7. 点击查看差异

    您可以验证这两个架构是否相同。

    您可以通过更新上次允许的修订版本字段,将您刚刚创建的架构修订版本用作主题的最后验证。

gcloud

gcloud pubsub schemas rollback SCHEMA_ID \
        --revision-id=REVISION_ID

其中:

  • REVISION_ID 是您要回滚到的修订版本。

REST

要回滚架构,请发送如下所示的 POST 请求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID:rollback
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

在请求正文中指定以下字段:

{
  "revisionId": REVISION_KD
}

其中:

  • REVISION_KD 是要回滚到的修订版本的 ID。

响应正文应包含架构资源的 JSON 表示法。

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// rollbackSchema creates a new schema revision that is a copy of the provided revisionID.
func rollbackSchema(w io.Writer, projectID, schemaID, revisionID string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// revisionID := "a1b2c3d4"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	s, err := client.RollbackSchema(ctx, schemaID, revisionID)
	if err != nil {
		return fmt.Errorf("RollbackSchema: %v", err)
	}
	fmt.Fprintf(w, "Rolled back a schema: %#v\n", s)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class RollbackSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project";
    String schemaId = "your-schema";
    String revisionId = "your-revision";

    rollbackSchemaExample(projectId, schemaId, revisionId);
  }

  public static void rollbackSchemaExample(String projectId, String schemaId, String revisionId)
      throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema = schemaServiceClient.rollbackSchema(schemaName, revisionId);

      System.out.println("Rolled back a schema:" + schema);

    } catch (NotFoundException e) {
      System.out.println(schemaName + "not found.");
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# schema_revision_id = "your-schema-revision-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

try:
    result = schema_client.rollback_schema(
        request={"name": schema_path, "revision_id": schema_revision_id}
    )
    print(f"Rolled back a schema revision:\n{result}")
except NotFound:
    print(f"{schema_id} not found.")

删除架构修订版本

以下是删除架构修订版本的一些重要准则:

  • 您可以从架构中删除一个或多个架构修订版本。

  • 如果架构只有一个修订版本,则无法删除该修订版本。 请改为删除架构。

  • 架构的删除操作还会删除与该架构关联的所有修订版本。

  • 如果您删除架构,则向与该架构关联的主题发布消息会失败。

  • 如果您删除架构修订版本,并且指定为主题的第一个修订版本,则下一个修订版本将用于验证。

    如果将已删除的架构修订版本指定为主题的最后一个修订版本,则将改用上一个架构修订版本进行验证。

    如果已删除的架构修订版本位于主题的指定修订版本范围内以进行验证,系统会跳过该修订版本。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库删除架构修订版本。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

  2. 点击现有架构的名称。

    系统会打开架构的架构详情页面。

  3. 选择要删除的修订版本。您还可以选择多个修订版本。

  4. 点击删除修订版本

  5. 确认删除操作。

gcloud

gcloud pubsub schemas delete-revision SCHEMA_NAME@REVISION_ID

其中:

  • REVISION_ID 是您要回滚到的修订版本。

REST

要删除架构修订版本,请发送如下所示的 DELETE 请求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID@REVISION_ID:deleteRevision
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

响应正文应包含代表已删除架构资源的 JSON。

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func deleteSchemaRevision(w io.Writer, projectID, schemaID, revisionID string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema-id"
	// revisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	if _, err := client.DeleteSchemaRevision(ctx, schemaID, revisionID); err != nil {
		return fmt.Errorf("client.DeleteSchema revision: %v", err)
	}
	fmt.Fprintf(w, "Deleted a schema revision: %s@%s", schemaID, revisionID)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.DeleteSchemaRevisionRequest;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class DeleteSchemaRevisionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id@your-revision-id";

    deleteSchemaRevisionExample(projectId, schemaId);
  }

  public static void deleteSchemaRevisionExample(String projectId, String schemaId)
      throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      DeleteSchemaRevisionRequest request =
          DeleteSchemaRevisionRequest.newBuilder().setName(schemaName.toString()).build();

      schemaServiceClient.deleteSchemaRevision(request);

      System.out.println("Deleted a schema revision:" + schemaName);

    } catch (NotFoundException e) {
      System.out.println(schemaName + "not found.");
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# revision_id = "your-revision-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id + "@" + revision_id)

try:
    schema_client.delete_schema_revision(request={"name": schema_path})
    print(f"Deleted a schema revision:\n{schema_path}")
except NotFound:
    print(f"{schema_id} not found.")

删除架构

在删除架构之前,请确保移除其与主题的关联

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库删除架构。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

  2. 选择要删除的一个或多个架构。

  3. 点击删除

  4. 确认删除操作。

gcloud

gcloud pubsub schemas delete SCHEMA_NAME

REST

如需删除架构,请发送如下所示的 DELETE 请求:

DELETE https://pubsub.googleapis.com/v1/SCHEMA_NAME

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id) {
  google::pubsub::v1::DeleteSchemaRequest request;
  request.set_name(pubsub::Schema(project_id, schema_id).FullName());
  auto status = client.DeleteSchema(request);
  // Note that kNotFound is a possible result when the library retries.
  if (status.code() == google::cloud::StatusCode::kNotFound) {
    std::cout << "The schema was not found\n";
    return;
  }
  if (!status.ok()) throw std::runtime_error(status.message());

  std::cout << "Schema successfully deleted\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;

public class DeleteSchemaSample
{
    public void DeleteSchema(string projectId, string schemaId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        SchemaName schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        schemaService.DeleteSchema(schemaName);
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func deleteSchema(w io.Writer, projectID, schemaID string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	if err := client.DeleteSchema(ctx, schemaID); err != nil {
		return fmt.Errorf("client.DeleteSchema: %v", err)
	}
	fmt.Fprintf(w, "Deleted schema: %s", schemaID)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class DeleteSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";

    deleteSchemaExample(projectId, schemaId);
  }

  public static void deleteSchemaExample(String projectId, String schemaId) throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      schemaServiceClient.deleteSchema(schemaName);

      System.out.println("Deleted a schema:" + schemaName);

    } catch (NotFoundException e) {
      System.out.println(schemaName + "not found.");
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function deleteSchema(schemaNameOrId) {
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();
  await schema.delete();
  console.log(`Schema ${name} deleted.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Delete a schema.
 *
 * @param string $projectId
 * @param string $schemaId
 */
function delete_schema($projectId, $schemaId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);

    if ($schema->exists()) {
        $schema->delete();

        printf('Schema %s deleted.', $schema->name());
    } else {
        printf('Schema %s does not exist.', $schema->name());
    }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

try:
    schema_client.delete_schema(request={"name": schema_path})
    print(f"Deleted a schema:\n{schema_path}")
except NotFound:
    print(f"{schema_id} not found.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# schema_id = "your-schema-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schema = pubsub.schema schema_id
schema.delete

puts "Schema #{schema_id} deleted."

查看架构详情

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库获取架构的详细信息。结果将返回最新修订版本 ID 的详细信息。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

  2. 点击要查看的架构的名称。

    系统会打开架构的架构详情页面。

gcloud

如需查看架构的最新修订版本,请执行以下操作:

gcloud pubsub schemas describe SCHEMA_NAME

如需查看架构的特定修订版本,请执行以下操作:

gcloud pubsub schemas describe SCHEMA_ID@REVISION_ID

其中:

  • REVISION_ID 是您要回滚到的修订版本。

REST

如需获取架构的最新修订版本的详细信息,请发送如下所示的 GET 请求:

GET https://pubsub.googleapis.com/v1/SCHEMA_NAME

如需获取架构特定修订版本的详细信息,请发送如下所示的 GET 请求:

GET https://pubsub.googleapis.com/v1/SCHEMA_NAME@REVISION_ID

如果成功,响应正文将包含一个架构类实例。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id) {
  google::pubsub::v1::GetSchemaRequest request;
  request.set_name(pubsub::Schema(project_id, schema_id).FullName());
  request.set_view(google::pubsub::v1::FULL);
  auto schema = client.GetSchema(request);
  if (!schema) throw std::move(schema).status();

  std::cout << "The schema exists and its metadata is: "
            << schema->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;

public class GetSchemaSample
{
    public Schema GetSchema(string projectId, string schemaId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        SchemaName schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        GetSchemaRequest request = new GetSchemaRequest
        {
            Name = schemaName.ToString(),
            View = SchemaView.Full
        };

        return schemaService.GetSchema(request);
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func getSchemaRevision(w io.Writer, projectID, schemaID string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema[@my-schema-revision]"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	s, err := client.Schema(ctx, schemaID, pubsub.SchemaViewFull)
	if err != nil {
		return fmt.Errorf("client.Schema revision: %v", err)
	}
	fmt.Fprintf(w, "Got schema revision: %#v\n", s)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class GetSchemaRevisionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id[@your-schema-revision]";
    getSchemaRevisionExample(projectId, schemaId);
  }

  public static void getSchemaRevisionExample(String projectId, String schemaId)
      throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema = schemaServiceClient.getSchema(schemaName);

      System.out.println("Got a schema:\n" + schema);

    } catch (NotFoundException e) {
      System.out.println(schemaName + "not found.");
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function getSchema(schemaNameOrId) {
  const schema = pubSubClient.schema(schemaNameOrId);
  const info = await schema.get();
  const fullName = await schema.getName();
  console.log(`Schema ${fullName} info: ${JSON.stringify(info, null, 4)}.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Get a schema.
 *
 * @param string $projectId
 * @param string $schemaId
 */
function get_schema($projectId, $schemaId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);
    $schema->info();

    printf('Schema %s retrieved', $schema->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# schema_revision_id = "your-schema-revision-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(
    project_id, schema_id + "@" + schema_revision_id
)

try:
    result = schema_client.get_schema(request={"name": schema_path})
    print(f"Got a schema revision:\n{result}")
except NotFound:
    print(f"{schema_id} not found.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# schema_id = "your-schema-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schema = pubsub.schema schema_id

puts "Schema #{schema.name} retrieved."

列出架构

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库列出 Google Cloud 项目中的架构。

控制台

  • 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

    系统会显示架构列表。

gcloud

gcloud pubsub schemas list

使用 gcloud pubsub schemas list --view=FULL 命令可查看每个架构的最新定义。

REST

要列出项目中的架构,请发送如下所示的 GET 请求:

GET https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas

如果成功,响应正文将包含一个 JSON 对象,该对象包含项目中所有架构的最新修订版本

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id) {
  auto const parent = google::cloud::Project(project_id).FullName();
  for (auto& s : client.ListSchemas(parent)) {
    if (!s) throw std::move(s).status();
    std::cout << "Schema: " << s->DebugString() << "\n";
  }
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Api.Gax.ResourceNames;
using Google.Cloud.PubSub.V1;
using System.Collections.Generic;

public class ListSchemasSample
{
    public IEnumerable<Schema> ListSchemas(string projectId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        ProjectName projectName = ProjectName.FromProject(projectId);
        var schemas = schemaService.ListSchemas(projectName);
        return schemas;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/iterator"
)

func listSchemas(w io.Writer, projectID string) ([]*pubsub.SchemaConfig, error) {
	// projectID := "my-project-id"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return nil, fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	var schemas []*pubsub.SchemaConfig

	schemaIter := client.Schemas(ctx, pubsub.SchemaViewFull)
	for {
		sc, err := schemaIter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return nil, fmt.Errorf("schemaIter.Next: %v", err)
		}
		fmt.Fprintf(w, "Got schema: %#v\n", sc)
		schemas = append(schemas, sc)
	}

	fmt.Fprintf(w, "Got %d schemas", len(schemas))
	return schemas, nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import java.io.IOException;

public class ListSchemasExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";

    listSchemasExample(projectId);
  }

  public static void listSchemasExample(String projectId) throws IOException {
    ProjectName projectName = ProjectName.of(projectId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
      for (Schema schema : schemaServiceClient.listSchemas(projectName).iterateAll()) {
        System.out.println(schema);
      }
      System.out.println("Listed schemas.");
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档


// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listSchemas() {
  for await (const s of pubSubClient.listSchemas()) {
    console.log(s.name);
  }
  console.log('Listed schemas.');
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * List schemas in the project.
 *
 * @param string $projectId
 */
function list_schemas($projectId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schemas = $pubsub->schemas();
    foreach ($schemas as $schema) {
        printf('Schema name: %s' . PHP_EOL, $schema->name());
    }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"

project_path = f"projects/{project_id}"
schema_client = SchemaServiceClient()

for schema in schema_client.list_schemas(request={"parent": project_path}):
    print(schema)

print("Listed schemas.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schemas = pubsub.schemas

puts "Schemas in project:"
schemas.each do |schema|
  puts schema.name
end

列出架构的修订版本

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库列出 Google Cloud 项目中架构的修订版本。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

    系统会显示架构列表。

  2. 点击要查看的架构的名称。

    系统会打开架构的架构详情页面。

    修订版本部分,您可以查看架构的可用修订版本列表。

gcloud

如需查看架构的最新修订版本,请执行以下操作:

gcloud pubsub schemas list-revisions SCHEMA_ID

使用 gcloud pubsub schemas list-revisions <var>SCHEMA_ID</var> --view=FULL 命令可查看架构修订版本的定义。

REST

如需列出架构的架构修订版本,请发送如下所示的 GET 请求:

GET https://pubsub.googleapis.com/v1/projects/SCHEMA_NAME:listRevisions

如果成功,响应正文将包含一个 JSON 对象,其中包含架构的所有架构修订版本

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/iterator"
)

func listSchemaRevisions(w io.Writer, projectID, schemaID string) ([]*pubsub.SchemaConfig, error) {
	// projectID := "my-project-id"
	// schemaID := "my-schema-id"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return nil, fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	var schemas []*pubsub.SchemaConfig

	schemaIter := client.ListSchemaRevisions(ctx, schemaID, pubsub.SchemaViewFull)
	for {
		sc, err := schemaIter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return nil, fmt.Errorf("schemaIter.Next: %v", err)
		}
		fmt.Fprintf(w, "Got schema revision: %#v\n", sc)
		schemas = append(schemas, sc)
	}

	fmt.Fprintf(w, "Got %d schema revisions", len(schemas))
	return schemas, nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class ListSchemaRevisionsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";

    listSchemaRevisionsExample(projectId, schemaId);
  }

  public static void listSchemaRevisionsExample(String projectId, String schemaId)
      throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
      for (Schema schema : schemaServiceClient.listSchemaRevisions(schemaName).iterateAll()) {
        System.out.println(schema);
      }
      System.out.println("Listed schema revisions.");
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

for schema in schema_client.list_schema_revisions(request={"name": schema_path}):
    print(schema)

print("Listed schema revisions.")

验证架构定义

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库验证架构。

按照创建架构提交架构修订版本中所述的步骤操作。

验证检查不会检查新修订版本与旧修订版本的兼容性。

验证架构的消息

您可以在创建架构资源之前或之后验证消息是否遵循特定架构。此步骤可确保在应用架构之前,您打算通过与架构关联的主题发送的消息实际上是匹配的。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库针对架构验证消息。

控制台

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 架构页面。

    转到“架构”

    系统会显示架构列表。

  2. 点击现有架构的架构 ID

    系统会打开架构的架构详情页面。

  3. 修订版本部分中,点击要为其验证消息的修订版本。

  4. 详细信息部分,点击测试消息

  5. 测试消息窗口中,选择一种消息编码类型。

  6. 消息正文中,输入测试消息。

  7. 点击测试

架构的配额和限制

架构具有以下限制:

  • 架构定义字段的大小不得超过 50 KB。

  • 一个项目最多可以有 10000 个架构。

  • 单个架构最多只能有 20 个修订版本。

    如需了解与 Pub/Sub 相关的架构和限制,请参阅 Pub/Sub 配额和限制

后续步骤