スキーマの作成と管理

このページでは、Pub/Sub トピックのスキーマを作成、管理する方法を説明します。

スキーマとは、Pub/Sub が適用するパブリッシャーとサブスクライバー間の契約の作成時に準拠すべきメッセージの形式です。また、メッセージ タイプと権限を監視する中央機関を作成することで、組織内のデータ ストリームのチーム間消費を促進します。

Pub/Sub スキーマは、メッセージ内のフィールドの名前とデータ型を定義します。スキーマをバージョニングされたスタンドアロン リソースとして作成し、複数の Pub/Sub トピックにスキーマを関連付けて、パブリッシュされたメッセージの構造を検証できます。

スキーマを作成する

スキーマを作成して 1 つ以上のトピックを割り当てることも、トピックの作成中にスキーマを作成することもできます。スキーマをトピックに割り当てた後、パブリッシャーからトピックが受信するメッセージはすべてそのスキーマに準拠する必要があります。

スキーマを作成するには、Cloud Console、gcloud ツール、Pub/Sub API を使用します。

Console

スキーマを作成するには、次の操作を行います。

  1. Cloud Console で、[Pub/Sub スキーマ] ページに移動します。

    スキーマページに移動

  2. [スキーマを作成] をクリックします。

  3. [スキーマ ID] フィールドにスキーマの ID を入力します。

  4. [スキーマタイプ] で [Avro] または [プロトコル バッファ] を選択します。詳細については、下記のスキーマタイプをご覧ください。

  5. [スキーマ定義] フィールドに、スキーマのプロトコル バッファ定義の Avro を入力します。

  6. [作成] をクリックしてスキーマを保存します。

gcloud

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

ここで

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER のどちらかです。
  • SCHEMA_DEFINITION は、選択したスキーマタイプに従って形式指定されたスキーマの定義を含む string です。

REST

スキーマを作成するには、次のように POST リクエストを送信します。

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

リクエスト本文に次のフィールドを指定します。

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
}

ここで

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER のどちらかです。
  • SCHEMA_DEFINITION は、選択したスキーマタイプに従って形式指定されたスキーマの定義を含む文字列です。

レスポンスの本文には、スキーマ リソースの JSON 表現を含める必要があります。次に例を示します。

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
}

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

Avro
namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto constexpr kDefinition = R"js({
    "type": "record",
    "name": "State",
    "namespace": "utilities",
    "doc": "A list of states in the United States of America.",
    "fields": [
      {
        "name": "name",
        "type": "string",
        "doc": "The common name of the state."
      },
      {
        "name": "post_abbr",
        "type": "string",
        "doc": "The postal code abbreviation of the state."
      }
    ]
  })js";
  auto schema = client.CreateAvroSchema(pubsub::Schema(project_id, schema_id),
                                        kDefinition);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::runtime_error(schema.status().message());

  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}
Proto
namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto constexpr kDefinition = R"pfile(
      syntax = "proto3";
      package google.cloud.pubsub.samples;

      message State {
        string name = 1;
        string post_abbr = 2;
      }
      )pfile";
  auto schema = client.CreateProtobufSchema(
      pubsub::Schema(project_id, schema_id), kDefinition);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) return;  // TODO(#4792) - protobuf schema support in emulator
  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

Avro

using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateAvroSchemaSample
{
    public Schema CreateAvroSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            Name = schemaName.ToString(),
            Type = Schema.Types.Type.Avro,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            Parent = "projects/" + projectId,
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}
Proto

using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateProtoSchemaSample
{
    public Schema CreateProtoSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            Name = schemaName.ToString(),
            Type = Schema.Types.Type.ProtocolBuffer,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            Parent = "projects/" + projectId,
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Avro
import (
	"context"
	"fmt"
	"io"
	"io/ioutil"

	"cloud.google.com/go/pubsub"
)

// createAvroSchema creates a schema resource from a JSON-formatted Avro schema file.
func createAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	avscSource, err := ioutil.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaAvro,
		Definition: string(avscSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %v", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}
Proto
import (
	"context"
	"fmt"
	"io"
	"io/ioutil"

	"cloud.google.com/go/pubsub"
)

// createProtoSchema creates a schema resource from a schema proto file.
func createProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	protoSource, err := ioutil.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaProtocolBuffer,
		Definition: string(protoSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %v", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Avro

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    createAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static void createAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using an Avro schema:\n" + schema);
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}
プロトコル バッファ

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    createProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static void createProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using a protobuf schema:\n" + schema);
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

Avro
/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaName = 'YOUR_SCHEMA_NAME';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');
const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createAvroSchema(schemaName, avscFile) {
  const definition = fs.readFileSync(avscFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaName,
    SchemaTypes.Avro,
    definition
  );

  const name = await schema.getName();
  console.log(`Schema ${name} created.`);
}
プロトコル バッファ
/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaName = 'YOUR_SCHEMA_NAME';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');
const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createProtoSchema(schemaName, protoFile) {
  const definition = fs.readFileSync(protoFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaName,
    SchemaTypes.ProtocolBuffer,
    definition
  );

  const fullName = await schema.getName();
  console.log(`Schema ${fullName} created.`);
}

PHP

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

Avro
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Schema\Type;

/**
 * Create a Schema with an AVRO definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $avscFile
 */
function create_avro_schema($projectId, $schemaId, $avscFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = file_get_contents($avscFile);
    $schema = $pubsub->createSchema($schemaId, Type::AVRO, $definition);

    printf('Schema %s created.', $schema->name());
}
プロトコル バッファ
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Schema\Type;

/**
 * Create a Schema with an Protocol Buffer definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $protoFile
 */
function create_proto_schema($projectId, $schemaId, $protoFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = file_get_contents($protoFile);
    $schema = $pubsub->createSchema($schemaId, Type::PROTOCOL_BUFFER, $definition);

    printf('Schema %s created.', $schema->name());
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

Avro
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

project_path = f"projects/{project_id}"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using an Avro schema file:\n{result}")
except AlreadyExists:
    print(f"{schema_id} already exists.")
プロトコル バッファ
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

project_path = f"projects/{project_id}"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using a protobuf schema file:\n{result}")
except AlreadyExists:
    print(f"{schema_id} already exists.")

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

Avro
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

definition = File.read avsc_file
schema = pubsub.create_schema schema_id, :avro, definition

puts "Schema #{schema.name} created."
プロトコル バッファ
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

definition = File.read proto_file
schema = pubsub.create_schema schema_id, :protocol_buffer, definition

puts "Schema #{schema.name} created."

スキーマを使用するには、スキーマをトピックに割り当てる必要があります。1 つのスキーマを任意の数のトピックに関連付けることができます。

スキーマタイプ

スキーマは、次のフレームワークを使用して作成できます。

たとえば、次のスキーマでは、string フィールド、float フィールド、boolean フィールドを含むメッセージを定義しています。

Avro

{
 "type" : "record",
 "name" : "Avro",
 "fields" : [
   {
     "name" : "StringField",
     "type" : "string"
   },
   {
     "name" : "FloatField",
     "type" : "float"
   },
   {
     "name" : "BooleanField",
     "type" : "boolean"
   },
 ]
}

プロトコル バッファ

syntax = "proto3";
message ProtocolBuffer {
  string string_field = 1;
  float float_field = 2;
  bool boolean_field = 3;
}

Pub/Sub でスキーマを機能させるために、1 つの最上位タイプのみを定義できます。他の型を参照するインポート ステートメントもサポートされていません。

スキーマの詳細を取得する

Cloud Console、gcloud ツール、Pub/Sub API を使用して、スキーマの名前を基に、スキーマの詳細を取得できます。

Console

  1. Cloud Console で、[Pub/Sub スキーマ] ページに移動します。

    スキーマページに移動

  2. 表示するスキーマをリストから選択します。

gcloud

gcloud pubsub schemas describe SCHEMA_NAME

REST

スキーマの詳細を取得するには、次のような GET リクエストを送信します。

GET https://pubsub.googleapis.com/v1/SCHEMA_NAME

成功した場合、レスポンスの本文にはスキーマクラスのインスタンスが含まれます。

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto schema = client.GetSchema(pubsub::Schema(project_id, schema_id),
                                 google::pubsub::v1::FULL);
  if (!schema) throw std::runtime_error(schema.status().message());

  std::cout << "The schema exists and its metadata is: "
            << schema->DebugString() << "\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。


using Google.Cloud.PubSub.V1;

public class GetSchemaSample
{
    public Schema GetSchema(string projectId, string schemaId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        SchemaName schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        GetSchemaRequest request = new GetSchemaRequest
        {
            Name = schemaName.ToString(),
            View = SchemaView.Full
        };

        return schemaService.GetSchema(request);
    }
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func getSchema(w io.Writer, projectID, schemaID string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	// Retrieve the full schema view. If you don't want to retrive the
	// definition, pass in pubsub.SchemaViewBasic which retrieves
	// just the name and type of the schema.
	s, err := client.Schema(ctx, schemaID, pubsub.SchemaViewFull)
	if err != nil {
		return fmt.Errorf("client.Schema: %v", err)
	}
	fmt.Fprintf(w, "Got schema: %#v\n", s)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class GetSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";

    getSchemaExample(projectId, schemaId);
  }

  public static void getSchemaExample(String projectId, String schemaId) throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema = schemaServiceClient.getSchema(schemaName);

      System.out.println("Got a schema:\n" + schema);

    } catch (NotFoundException e) {
      System.out.println(schemaName + "not found.");
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const schemaName = 'YOUR_SCHEMA_NAME';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function getSchema(schemaName) {
  const schema = pubSubClient.schema(schemaName);
  const info = await schema.get();
  const fullName = await schema.getName();
  console.log(`Schema ${fullName} info: ${JSON.stringify(info, null, 4)}.`);
}

PHP

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Get a schema.
 *
 * @param string $projectId
 * @param string $schemaId
 */
function get_schema($projectId, $schemaId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);
    $schema->info();

    printf('Schema %s retrieved', $schema->name());
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

try:
    result = schema_client.get_schema(request={"name": schema_path})
    print(f"Got a schema:\n{result}")
except NotFound:
    print(f"{schema_id} not found.")

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# schema_id = "your-schema-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schema = pubsub.schema schema_id

puts "Schema #{schema.name} retrieved."

スキーマの一覧表示

Cloud Console、gcloud ツール、Pub/Sub API を使用して、Google Cloud プロジェクトのスキーマを一覧表示できます。

Console

Cloud Console で、[Pub/Sub スキーマ] ページに移動します。このページには、現在のプロジェクト内のすべてのスキーマの一覧が含まれています。

スキーマページに移動

gcloud

gcloud pubsub schemas list

REST

プロジェクト内のスキーマを一覧表示するには、次のような GET リクエストを送信します。

GET https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas

成功した場合、レスポンスの本文にはプロジェクト内のすべてのスキーマを含む JSON オブジェクトが含まれます。

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id) {
  for (auto const& schema :
       client.ListSchemas(project_id, google::pubsub::v1::FULL)) {
    if (!schema) throw std::runtime_error(schema.status().message());
    std::cout << "Schema: " << schema->DebugString() << "\n";
  }
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。


using Google.Api.Gax.ResourceNames;
using Google.Cloud.PubSub.V1;
using System.Collections.Generic;

public class ListSchemasSample
{
    public IEnumerable<Schema> ListSchemas(string projectId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        ProjectName projectName = ProjectName.FromProject(projectId);
        var schemas = schemaService.ListSchemas(projectName);
        return schemas;
    }
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/iterator"
)

func listSchemas(w io.Writer, projectID string) ([]*pubsub.SchemaConfig, error) {
	// projectID := "my-project-id"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return nil, fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	var schemas []*pubsub.SchemaConfig

	schemaIter := client.Schemas(ctx, pubsub.SchemaViewFull)
	for {
		sc, err := schemaIter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return nil, fmt.Errorf("schemaIter.Next: %v", err)
		}
		fmt.Fprintf(w, "Got schema: %#v\n", sc)
		schemas = append(schemas, sc)
	}

	fmt.Fprintf(w, "Got %d schemas", len(schemas))
	return schemas, nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import java.io.IOException;

public class ListSchemasExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";

    listSchemasExample(projectId);
  }

  public static void listSchemasExample(String projectId) throws IOException {
    ProjectName projectName = ProjectName.of(projectId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
      for (Schema schema : schemaServiceClient.listSchemas(projectName).iterateAll()) {
        System.out.println(schema);
      }
      System.out.println("Listed schemas.");
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。


// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listSchemas() {
  for await (const s of pubSubClient.listSchemas()) {
    console.log(await s.name);
  }
  console.log('Listed schemas.');
}

PHP

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * List schemas in the project.
 *
 * @param string $projectId
 */
function list_schemas($projectId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schemas = $pubsub->schemas();
    foreach ($schemas as $schema) {
        printf('Schema name: %s' . PHP_EOL, $schema->name());
    }
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"

project_path = f"projects/{project_id}"
schema_client = SchemaServiceClient()

for schema in schema_client.list_schemas(request={"parent": project_path}):
    print(schema)

print("Listed schemas.")

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schemas = pubsub.schemas

puts "Schemas in project:"
schemas.each do |schema|
  puts schema.name
end

スキーマの削除

スキーマを削除するには、Cloud Console、gcloud ツール、Pub/Sub API を使用します。

Console

  1. Cloud Console で、[Pub/Sub スキーマ] ページに移動します。

    スキーマページに移動

  2. 削除するスキーマをリストから選択します。

  3. [削除] をクリックします。

gcloud

gcloud pubsub schemas delete SCHEMA_NAME

REST

スキーマを削除するには、次のように DELETE リクエストを送信します。

DELETE https://pubsub.googleapis.com/v1/SCHEMA_NAME

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaAdminClient client, std::string const& project_id,
   std::string const& schema_id) {
  auto status = client.DeleteSchema(pubsub::Schema(project_id, schema_id));
  // Note that kNotFound is a possible result when the library retries.
  if (status.code() == google::cloud::StatusCode::kNotFound) {
    std::cout << "The schema was not found\n";
    return;
  }
  if (!status.ok()) throw std::runtime_error(status.message());

  std::cout << "Schema successfully deleted\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。


using Google.Cloud.PubSub.V1;

public class DeleteSchemaSample
{
    public void DeleteSchema(string projectId, string schemaId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        SchemaName schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        schemaService.DeleteSchema(schemaName);
    }
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func deleteSchema(w io.Writer, projectID, schemaID string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %v", err)
	}
	defer client.Close()

	if err := client.DeleteSchema(ctx, schemaID); err != nil {
		return fmt.Errorf("client.DeleteSchema: %v", err)
	}
	fmt.Fprintf(w, "Deleted schema: %s", schemaID)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;

public class DeleteSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";

    deleteSchemaExample(projectId, schemaId);
  }

  public static void deleteSchemaExample(String projectId, String schemaId) throws IOException {
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      schemaServiceClient.deleteSchema(schemaName);

      System.out.println("Deleted a schema:" + schemaName);

    } catch (NotFoundException e) {
      System.out.println(schemaName + "not found.");
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const schemaName = 'YOUR_SCHEMA_NAME';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function deleteSchema(schemaName) {
  const schema = pubSubClient.schema(schemaName);
  const name = await schema.getName();
  await schema.delete();
  console.log(`Schema ${name} deleted.`);
}

PHP

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Delete a schema.
 *
 * @param string $projectId
 * @param string $schemaId
 */
function delete_schema($projectId, $schemaId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);

    if ($schema->exists()) {
        $schema->delete();

        printf('Schema %s deleted.', $schema->name());
    } else {
        printf('Schema %s does not exist.', $schema->name());
    }
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

try:
    schema_client.delete_schema(request={"name": schema_path})
    print(f"Deleted a schema:\n{schema_path}")
except NotFound:
    print(f"{schema_id} not found.")

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# schema_id = "your-schema-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

schema = pubsub.schema schema_id
schema.delete

puts "Schema #{schema_id} deleted."

メッセージ スキーマの検証

スキーマ リソースの作成前または作成後に、メッセージが特定のスキーマに準拠しているかを検証できます。これは、スキーマに関連付けられているトピックを介して送信しようとしているメッセージがスキーマを適用する前に一致するかを確認できるので便利です(この操作は元に戻せません)。

まだ作成されていないスキーマと比較してメッセージを検証するには、スキーマ タイプと定義を渡します。

gcloud

gcloud pubsub schemas validate-message \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION \
        --message-encoding=MESSAGE_ENCODING \
        --message=MESSAGE

ここで

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER のどちらかです。
  • SCHEMA_DEFINITION は、選択したスキーマタイプに従って形式指定されたスキーマの定義を含む string です。
  • MESSAGE_ENCODINGJSONBINARY のどちらかです。
  • MESSAGE は検証するメッセージです。これを有効にするには、指定された MESSAGE_ENCODING に従ってエンコードし、指定された SCHEMA_DEFINITION に準拠する必要があります。

REST

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/:validateMessage
Authorization: Bearer $(gcloud auth application-default print-access-token)

リクエスト本文に次のフィールドを指定します。

{
  "schema": {
    "definition": SCHEMA_DEFINITION
    "type": SCHEMA_TYPE
  }
  "encoding": MESSAGE_ENCODING
  "message": MESSAGE
}

ここで

  • SCHEMA_TYPEAVROPROTOCOL_BUFFER のどちらかです。
  • SCHEMA_DEFINITION は、選択したスキーマタイプに従って形式指定されたスキーマの定義を含む文字列です。
  • MESSAGE_ENCODINGJSONBINARY のどちらかです。
  • MESSAGE は、base64 でエンコードされた検証するメッセージです。これを有効にするには、指定された MESSAGE_ENCODING に従ってエンコードし、指定された SCHEMA_DEFINITION に準拠する必要があります。

リクエストが成功した場合のレスポンスは空の JSON オブジェクトです。

既存のスキーマと比較してメッセージを検証するには、スキーマ名を渡します。

gcloud

gcloud pubsub schemas validate-message \
        --message-encoding=MESSAGE_ENCODING \
        --message=MESSAGE \
        --schema-name=SCHEMA_NAME

ここで

  • SCHEMA_NAME は、既存のサブネットの名前です。
  • MESSAGE_ENCODINGJSONBINARY のどちらかです。
  • MESSAGE は検証するメッセージです。有効にするには、指定した MESSAGE_ENCODING に応じてエンコードし、SCHEMA_NAME のスキーマ定義を遵守する必要があります。

REST

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/:validateMessage
Authorization: Bearer $(gcloud auth application-default print-access-token)

リクエスト本文に次のフィールドを指定します。

{
  "schema": {
    "name": SCHEMA_NAME
  }
  "encoding": MESSAGE_ENCODING
  "message": MESSAGE
}

ここで

  • SCHEMA_NAME は、既存のサブネットの名前です。
  • MESSAGE_ENCODINGJSONBINARY のどちらかです。
  • MESSAGE は、base64 でエンコードされた検証するメッセージです。これを有効にするには、指定された MESSAGE_ENCODING に従ってエンコードし、指定された SCHEMA_DEFINITION に準拠する必要があります。

リクエストが成功した場合のレスポンスは空の JSON オブジェクトです。