创建和管理架构

本页面介绍如何为 Pub/Sub 主题创建和管理架构。

架构是消息必须遵循的格式,用来在发布者和订阅者之间创建 Pub/Sub 将强制执行的合同。借助架构通过为消息类型和权限创建中央授权,来促进内部团队在组织内使用数据流。

Pub/Sub 架构定义消息中的字段的名称和数据类型。您可以将架构作为独立的有版本控制的资源来创建,将架构与多个 Pub/Sub 主题相关联,还可以用它们来验证已发布消息的结构。

创建架构

您可以创建架构并为其分配一个或多个主题,也可以在创建主题期间创建架构。将架构分配给主题后,该主题从发布者收到的每条消息都必须遵循该架构。

您可以使用 Cloud Console、gcloud 工具和 Pub/Sub API 创建架构:

控制台

如需创建架构,请按照以下步骤操作:

  1. 在 Cloud Console 中,转到 Pub/Sub 架构页面。

    转到“架构”页面

  2. 点击创建架构

  3. 架构 ID 字段中,输入架构的 ID。

  4. 对于架构类型,请选择 Avro 或协议缓冲区。请参阅下文详细了解架构类型

  5. 架构定义字段中,为您的架构输入 Avro 或协议缓冲区定义。

  6. 点击创建以保存架构。

gcloud

gcloud beta pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

其中:

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER
  • SCHEMA_DEFINITION 是包含架构定义的 string,系统会根据所选架构类型设置其格式。

REST

要创建架构,请发送如下所示的 POST 请求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

在请求正文中指定以下字段:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
}

其中:

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER
  • SCHEMA_DEFINITION 是包含架构定义的字符串,系统会根据所选架构类型设置其格式。

响应正文应包含架构资源的 JSON 表示法。例如:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

Avro
namespace pubsub = google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto constexpr kDefinition = R"js({
    "type": "record",
    "name": "State",
    "namespace": "utilities",
    "doc": "A list of states in the United States of America.",
    "fields": [
      {
        "name": "name",
        "type": "string",
        "doc": "The common name of the state."
      },
      {
        "name": "post_abbr",
        "type": "string",
        "doc": "The postal code abbreviation of the state."
      }
    ]
  })js";
  auto schema = client.CreateAvroSchema(pubsub::Schema(project_id, schema_id),
                                        kDefinition);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::runtime_error(schema.status().message());

  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}
Proto
namespace pubsub = google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto constexpr kDefinition = R"pfile(
      syntax = "proto3";
      package google.cloud.pubsub.samples;

      message State {
        string name = 1;
        string post_abbr = 2;
      }
      )pfile";
  auto schema = client.CreateProtobufSchema(
      pubsub::Schema(project_id, schema_id), kDefinition);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) return;  // TODO(#4792) - protobuf schema support in emulator
  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

Avro

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    createAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static void createAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using an Avro schema:\n" + schema);
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}
协议缓冲区

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    createProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static void createProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using a protobuf schema:\n" + schema);
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

Avro
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

project_path = f"projects/{project_id}"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using an Avro schema file:\n{result}")
except AlreadyExists:
    print(f"{schema_id} already exists.")
协议缓冲区
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

project_path = f"projects/{project_id}"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using a protobuf schema file:\n{result}")
except AlreadyExists:
    print(f"{schema_id} already exists.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

Avro
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

definition = File.read avsc_file
schema = pubsub.create_schema schema_id, :avro, definition

puts "Schema #{schema.name} created."
协议缓冲区
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

definition = File.read proto_file
schema = pubsub.create_schema schema_id, :protocol_buffer, definition

puts "Schema #{schema.name} created."

您必须将架构分配给主题才能使用架构。单个架构可以与任意数量的主题关联。

架构类型

您可以使用以下框架创建架构:

例如,以下架构定义了包含 string 字段、float 字段和 boolean 字段的消息:

Avro

{
 "type" : "record",
 "name" : "Avro",
 "fields" : [
   {
     "name" : "StringField",
     "type" : "string"
   },
   {
     "name" : "FloatField",
     "type" : "float"
   },
   {
     "name" : "BooleanField",
     "type" : "boolean"
   },
 ]
}

协议缓冲区

syntax = "proto3";
message ProtocolBuffer {
  string string_field = 1;
  float float_field = 2;
  bool boolean_field = 3;
}

获取架构详细信息

您可以使用 Cloud Console、gcloud 工具和 Pub/Sub API 按名称获取架构的详细信息:

控制台

  1. 在 Cloud Console 中,转到 Pub/Sub 架构页面。

    转到“架构”页面

  2. 从列表中选择您要查看的架构。

gcloud

gcloud beta pubsub schemas describe SCHEMA_NAME

REST

要获取架构的详细信息,请发送如下所示的 GET 请求:

GET https://pubsub.googleapis.com/v1/SCHEMA_NAME

如果成功,则响应正文将包含一个架构类实例。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto schema = client.GetSchema(pubsub::Schema(project_id, schema_id),
                                 google::pubsub::v1::FULL);
  if (!schema) throw std::runtime_error(schema.status().message());

  std::cout << "The schema exists and its metadata is: "
            << schema->DebugString() << "\n";
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class GetSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";

    getSchemaExample(projectId, schemaId);
  }

  public static void getSchemaExample(String projectId, String schemaId) throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema = schemaServiceClient.getSchema(schemaName);

      System.out.println("Got a schema:\n" + schema);

    } catch (NotFoundException e) {
      System.out.println(schemaName + "not found.");
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

try:
    result = schema_client.get_schema(request={"name": schema_path})
    print(f"Got a schema:\n{result}")
except NotFound:
    print(f"{schema_id} not found.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# schema_id = "your-schema-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schema = pubsub.schema schema_id

puts "Schema #{schema.name} retrieved."

列出架构

您可以使用 Cloud Console、gcloud 工具和 Pub/Sub API 列出 Google Cloud 项目中的架构:

控制台

在 Cloud Console 中,转到 Pub/Sub 架构页面。 此页面包含当前项目中所有架构的列表。

转到“架构”页面

gcloud

gcloud beta pubsub schemas list

REST

要列出项目中的架构,请发送如下所示的 GET 请求:

GET https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas

如果成功,则响应正文将包含一个包含项目中所有架构的 JSON 对象

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id) {
  for (auto const& schema :
       client.ListSchemas(project_id, google::pubsub::v1::FULL)) {
    if (!schema) throw std::runtime_error(schema.status().message());
    std::cout << "Schema: " << schema->DebugString() << "\n";
  }
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import java.io.IOException;

public class ListSchemasExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";

    listSchemasExample(projectId);
  }

  public static void listSchemasExample(String projectId) throws IOException {
    ProjectName projectName = ProjectName.of(projectId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
      for (Schema schema : schemaServiceClient.listSchemas(projectName).iterateAll()) {
        System.out.println(schema);
      }
      System.out.println("Listed schemas.");
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"

project_path = f"projects/{project_id}"
schema_client = SchemaServiceClient()

for schema in schema_client.list_schemas(request={"parent": project_path}):
    print(schema)

print("Listed schemas.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schemas = pubsub.schemas

puts "Schemas in project:"
schemas.each do |schema|
  puts schema.name
end

删除架构

您可以使用 Cloud Console、gcloud 工具和 Pub/Sub API 删除架构:

控制台

  1. 在 Cloud Console 中,转到 Pub/Sub 架构页面。

    转到“架构”页面

  2. 从列表中选择您要删除的架构。

  3. 点击删除

gcloud

gcloud beta pubsub schemas delete SCHEMA_NAME

REST

如需删除架构,请发送如下所示的 DELETE 请求:

DELETE https://pubsub.googleapis.com/v1/SCHEMA_NAME

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto status = client.DeleteSchema(pubsub::Schema(project_id, schema_id));
  // Note that kNotFound is a possible result when the library retries.
  if (status.code() == google::cloud::StatusCode::kNotFound) {
    std::cout << "The schema was not found\n";
    return;
  }
  if (!status.ok()) throw std::runtime_error(status.message());

  std::cout << "Schema successfully deleted\n";
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class DeleteSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";

    deleteSchemaExample(projectId, schemaId);
  }

  public static void deleteSchemaExample(String projectId, String schemaId) throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      schemaServiceClient.deleteSchema(schemaName);

      System.out.println("Deleted a schema:" + schemaName);

    } catch (NotFoundException e) {
      System.out.println(schemaName + "not found.");
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

try:
    schema_client.delete_schema(request={"name": schema_path})
    print(f"Deleted a schema:\n{schema_path}")
except NotFound:
    print(f"{schema_id} not found.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# schema_id = "your-schema-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schema = pubsub.schema schema_id
schema.delete

puts "Schema #{schema_id} deleted."

验证消息架构

您可以在创建架构资源之前或之后,验证消息是否符合特定架构。这有助于确保在应用架构(此操作无法撤销)之前您打算通过与架构关联的主题发送的消息确实会匹配。

如需根据尚未创建的架构验证消息,请传入架构类型和定义:

gcloud

gcloud beta pubsub schemas validate-message \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION \
        --message-encoding=MESSAGE_ENCODING \
        --message=MESSAGE

其中:

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER
  • SCHEMA_DEFINITION 是包含架构定义的 string,系统会根据所选架构类型设置其格式。
  • MESSAGE_ENCODINGJSONBINARY
  • MESSAGE 是要验证的消息。有效的消息必须根据指定的 MESSAGE_ENCODING 编码,并遵循指定的 SCHEMA_DEFINITION

REST

请求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/:validateMessage
Authorization: Bearer $(gcloud auth application-default print-access-token)

在请求正文中指定以下字段:

{
  "schema": {
    "definition": SCHEMA_DEFINITION
    "type": SCHEMA_TYPE
  }
  "encoding": MESSAGE_ENCODING
  "message": MESSAGE
}

其中:

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER
  • SCHEMA_DEFINITION 是包含架构定义的字符串,系统会根据所选架构类型设置其格式。
  • MESSAGE_ENCODINGJSONBINARY
  • MESSAGE 是要验证的 base64 编码的消息。有效的消息必须根据指定的 MESSAGE_ENCODING 编码,并遵循指定的 SCHEMA_DEFINITION

如果请求成功,响应将为空的 JSON 对象。

要根据现有架构验证消息,请传入架构名称:

gcloud

gcloud beta pubsub schemas validate-message \
        --message-encoding=MESSAGE_ENCODING \
        --message=MESSAGE \
        --schema-name=SCHEMA_NAME

其中:

  • SCHEMA_NAME 是现有架构的名称。
  • MESSAGE_ENCODINGJSONBINARY
  • MESSAGE 是要验证的消息。有效的消息必须根据指定的 MESSAGE_ENCODING 编码,并遵循 SCHEMA_NAME 的架构定义。

REST

请求:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/:validateMessage
Authorization: Bearer $(gcloud auth application-default print-access-token)

在请求正文中指定以下字段:

{
  "schema": {
    "name": SCHEMA_NAME
  }
  "encoding": MESSAGE_ENCODING
  "message": MESSAGE
}

其中:

  • SCHEMA_NAME 是现有架构的名称。
  • MESSAGE_ENCODINGJSONBINARY
  • MESSAGE 是要验证的 base64 编码的消息。有效的消息必须根据指定的 MESSAGE_ENCODING 编码,并遵循指定的 SCHEMA_DEFINITION

如果请求成功,响应将为空的 JSON 对象。