スキーマをトピックに関連付ける

このドキュメントでは、Pub/Sub トピックのスキーマを関連付ける方法について説明します。

準備

必要なロールと権限

スキーマの関連付けと管理に必要な権限を取得するには、管理者にPub/Sub 編集者 roles/pubsub.editor )プロジェクトに対する IAM ロールを付与するよう依頼してください。ロールの付与の詳細については、アクセスの管理をご覧ください。

この事前定義ロールには、スキーマの関連付けと管理に必要な権限が含まれています。必要な権限を正確に確認するには、[必要な権限] セクションを開いてください。

必要な権限

スキーマを関連付けて管理するには、次の権限が必要です。

  • スキーマを作成します: pubsub.schemas.create
  • スキーマをトピックに添付します: pubsub.schemas.attach
  • スキーマのリビジョンを commit します: pubsub.schemas.commit
  • スキーマまたはスキーマ リビジョンを削除します: pubsub.schemas.delete
  • スキーマまたはスキーマのリビジョンを取得します: pubsub.schemas.get
  • スキーマを一覧表示します: pubsub.schemas.list
  • スキーマのリビジョンを一覧表示します: pubsub.schemas.listRevisions
  • スキーマをロールバックします: pubsub.schemas.rollback
  • メッセージを検証します: pubsub.schemas.validate
  • スキーマの IAM ポリシーを取得します: pubsub.schemas.getIamPolicy
  • スキーマの IAM ポリシーを構成します: pubsub.schemas.setIamPolicy

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

ユーザー、グループ、ドメイン、サービス アカウントなどのプリンシパルにロールと権限を付与できます。あるプロジェクトにスキーマを作成し、別のプロジェクトにあるトピックにアタッチできます。プロジェクトごとに必要な権限があることを確認します。

スキーマをトピックに関連付けるためのガイドライン

トピックを作成または編集するときに、スキーマをトピックに関連付けることができます。 ここでは、スキーマをトピックに関連付けるためのガイドラインを示します。

  • スキーマを 1 つ以上のトピックに関連付けることができます。

    スキーマがトピックに関連付けられた後、トピックがパブリッシャーから受信するすべてのメッセージはそのスキーマに従う必要があります。

  • スキーマをトピックに関連付ける場合は、BINARY または JSON として公開されるメッセージのエンコードも指定する必要があります。Avro スキーマで JSON を使用する場合は、ユニオンのエンコード ルールに十分注意してください。

  • トピックに関連付けられたスキーマにリビジョンがある場合、メッセージはエンコードと一致している必要があり、使用可能な範囲内のリビジョンに対して検証する必要があります。検証を行わない場合、メッセージは公開されません。

    リビジョンは作成日時に基づいて、日付の新しい順に試行されます。スキーマ リビジョンを作成するには、スキーマ リビジョンを commit するをご覧ください。

メッセージ スキーマの検証ロジック

スキーマをトピックに関連付ける際、スキーマにリビジョンがある場合は、使用するリビジョンのサブセット範囲を指定できます。範囲を指定しない場合は、範囲全体が検証に使用されます。

リビジョンを [使用可能な最初のリビジョン] として指定しない場合、スキーマの既存の最も古いリビジョンが検証に使用されます。リビジョンを [使用可能な最後のリビジョン] として指定しない場合、スキーマの既存の最新のリビジョンが使用されます。

トピック T にアタッチされているスキーマ S の例を見てみましょう。

スキーマ S のリビジョン ID ABCD が順番に作成されます。ここで、A は最初のリビジョンまたは最も古いリビジョンです。スキーマが同一でないか、または既存のスキーマをロールバックしていません。

  • [使用可能な最初のリビジョン] フィールドを B に設定した場合にのみ、スキーマ A のみに準拠するメッセージは拒否され、スキーマ BCD に準拠するメッセージは受け入れられます。

  • [使用可能な最後のリビジョン] フィールドを C に設定した場合にのみ、スキーマ ABC に準拠するメッセージが受け入れられ、スキーマ D のみに準拠するメッセージは拒否されます。

  • [使用可能な最初のリビジョン] フィールドを B、[使用可能な最後のリビジョン] フィールドを C に設定した場合、スキーマ BC に準拠するメッセージが受け入れられます。

  • 最初と最後のリビジョンを同じリビジョン ID に設定することもできます。この場合、そのリビジョンに準拠するメッセージのみが受け入れられます。

トピックの作成時にスキーマを作成して関連付ける

スキーマを使用してトピックを作成するには、Google Cloud コンソール、gcloud CLI、Pub/Sub API、または Cloud クライアント ライブラリを使用します。

Console

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] に、トピックの ID を入力します。

    トピックに名前を付けるには、ガイドラインをご覧ください。

  4. [スキーマを使用する] チェックボックスをオンにします。

    残りのフィールドはデフォルト設定のままにします。

    スキーマは作成することも、既存のスキーマを使用することもできます。

  5. スキーマを作成する場合は、次のようにします。

    1. [Pub/Sub スキーマを選択] で、[新しいスキーマを作成] を選択します。

    [スキーマの作成] ページがセカンダリ タブに表示されます。

    スキーマを作成するの手順に従います。

    1. [トピックを作成] タブに戻り、[更新] をクリックします。

    2. [Pub/Sub スキーマを選択] フィールドでスキーマを検索します。

    3. メッセージ エンコードとして [JSON] または [バイナリ] を選択します。

    作成したスキーマにはリビジョン ID があります。スキーマ リビジョンを commit するで説明したように、追加のスキーマ リビジョンを作成できます。

  6. すでに作成されたスキーマを関連付ける場合は、次の手順に沿って操作します。

    1. [Pub/Sub スキーマを選択] で、既存のスキーマを選択します。

    2. メッセージ エンコードとして [JSON] または [バイナリ] を選択します。

  7. 省略可: 選択したスキーマにリビジョンがある場合、[リビジョン範囲] で、[使用可能な最初のリビジョン] と [使用可能な最後のリビジョン] のプルダウン メニューを使用します。

両方のフィールドを指定することも、1 つのみを指定することも、要件に基づいてデフォルト設定を保持することもできます。

  1. 残りのフィールドはデフォルト設定のままにします。

  2. [作成] をクリックしてトピックを保存し、選択したスキーマに割り当てます。

gcloud

以前に作成したスキーマが割り当てられたトピックを作成するには、次の gcloud pubsub topics create コマンドを実行します。

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

ここで

  • TOPIC_ID は、作成するトピックの ID です。
  • ENCODING_TYPE は、スキーマに対して検証されるメッセージのエンコードです。この値は JSON または BINARY に設定する必要があります。
  • SCHEMA_ID は、既存のスキーマの ID です。
  • FIRST_REVISION_ID は、検証する最も古いリビジョンの ID です。
  • LAST_REVISION_ID は、検証する最新のリビジョンの ID です。

--first-revision-id--last-revision-id は省略可能です。

別の Google Cloud プロジェクトからスキーマを割り当てることもできます。

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --schema-project=SCHEMA_PROJECT \
        --project=TOPIC_PROJECT

ここで

  • SCHEMA_PROJECT は、スキーマの Google Cloud プロジェクトのプロジェクト ID です。
  • TOPIC_PROJECT は、トピックの Google Cloud プロジェクトのプロジェクト ID です。

REST

トピックを作成するには、projects.topics.create メソッドを使用します。

リクエスト:

リクエストは、Authorization ヘッダー内のアクセス トークンにより認証を受ける必要があります。現在のアプリケーションのデフォルト認証情報のアクセス トークンを取得する場合は、gcloud auth application-default print-access-token を使用します。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

リクエストの本文:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

ここで

  • PROJECT_ID はプロジェクト ID です。
  • TOPIC_ID はトピック ID です。
  • SCHEMA_NAME は、公開されたメッセージが検証される必要があるスキーマの名前です。形式は projects/PROJECT_ID/schemas/SCHEMA_ID です。
  • ENCODING_TYPE は、スキーマに対して検証されるメッセージのエンコードです。JSON または BINARY に設定する必要があります。
  • FIRST_REVISION_ID は、検証する最も古いリビジョンの ID です。
  • LAST_REVISION_ID は、検証する最新のリビジョンの ID です。

firstRevisionIdlastRevisionId は省略可能です。

対応:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

リクエストで指定されていない場合、firstRevisionIdlastRevisionId は両方とも省略されます。

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string schema_id, std::string const& encoding) {
  google::pubsub::v1::Topic request;
  request.set_name(pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.mutable_schema_settings()->set_schema(
      pubsub::Schema(std::move(project_id), std::move(schema_id)).FullName());
  request.mutable_schema_settings()->set_encoding(
      encoding == "JSON" ? google::pubsub::v1::JSON
                         : google::pubsub::v1::BINARY);
  auto topic = client.CreateTopic(request);

  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;

public class CreateTopicWithSchemaSample
{
    public Topic CreateTopicWithSchema(string projectId, string topicId, string schemaId, Encoding encoding)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        Topic topic = new Topic
        {
            TopicName = topicName,
            SchemaSettings = new SchemaSettings
            {
                SchemaAsSchemaName = SchemaName.FromProjectSchema(projectId, schemaId),
                Encoding = encoding
            }
        };

        Topic receivedTopic = null;
        try
        {
            receivedTopic = publisher.CreateTopic(topic);
            Console.WriteLine($"Topic {topic.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Topic {topicName} already exists.");
        }
        return receivedTopic;
    }
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithSchemaRevisions(w io.Writer, projectID, topicID, schemaID, firstRevisionID, lastRevisionID string, encoding pubsub.SchemaEncoding) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// schemaID := "my-schema-id"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	// encoding := pubsub.EncodingJSON
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	tc := &pubsub.TopicConfig{
		SchemaSettings: &pubsub.SchemaSettings{
			Schema:          fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
			FirstRevisionID: firstRevisionID,
			LastRevisionID:  lastRevisionID,
			Encoding:        encoding,
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, tc)
	if err != nil {
		return fmt.Errorf("CreateTopicWithConfig: %w", err)
	}
	fmt.Fprintf(w, "Created topic with schema revision: %#v\n", t)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Use an existing schema.
    String schemaId = "your-schema-id";
    // Choose either BINARY or JSON message serialization in this topic.
    Encoding encoding = Encoding.BINARY;
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    createTopicWithSchemaRevisionsExample(
        projectId, topicId, schemaId, firstRevisionId, lastRevisionId, encoding);
  }

  public static void createTopicWithSchemaRevisionsExample(
      String projectId,
      String topicId,
      String schemaId,
      String firstRevisionid,
      String lastRevisionId,
      Encoding encoding)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    SchemaSettings schemaSettings =
        SchemaSettings.newBuilder()
            .setSchema(schemaName.toString())
            .setFirstRevisionId(firstRevisionid)
            .setLastRevisionId(lastRevisionId)
            .setEncoding(encoding)
            .build();

    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setSchemaSettings(schemaSettings)
                  .build());

      System.out.println("Created topic with schema: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId,
  schemaNameOrId,
  encodingType
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId: string,
  schemaNameOrId: string,
  encodingType: 'BINARY' | 'JSON'
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

PHP

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\Schema;

/**
 * Create a topic with a schema.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $schemaId
 * @param string $encoding
 */
function create_topic_with_schema($projectId, $topicId, $schemaId, $encoding)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);

    $topic = $pubsub->createTopic($topicId, [
        'schemaSettings' => [
            // The schema may be provided as an instance of the schema type,
            // or by using the schema ID directly.
            'schema' => $schema,
            // Encoding may be either `BINARY` or `JSON`.
            // Provide a string or a constant from Google\Cloud\PubSub\V1\Encoding.
            'encoding' => $encoding,
        ]
    ]);

    printf('Topic %s created', $topic->name());
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.api_core.exceptions import AlreadyExists, InvalidArgument
from google.cloud.pubsub import PublisherClient, SchemaServiceClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = "BINARY"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

if message_encoding == "BINARY":
    encoding = Encoding.BINARY
elif message_encoding == "JSON":
    encoding = Encoding.JSON
else:
    encoding = Encoding.ENCODING_UNSPECIFIED

try:
    response = publisher_client.create_topic(
        request={
            "name": topic_path,
            "schema_settings": {
                "schema": schema_path,
                "encoding": encoding,
                "first_revision_id": first_revision_id,
                "last_revision_id": last_revision_id,
            },
        }
    )
    print(f"Created a topic:\n{response}")

except AlreadyExists:
    print(f"{topic_id} already exists.")
except InvalidArgument:
    print("Please choose either BINARY or JSON as a valid message encoding type.")

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = :binary

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.create_topic topic_id, schema_name: schema_id, message_encoding: message_encoding

puts "Topic #{topic.name} created."

トピックに関連付けられたスキーマを編集する

トピックを編集して、スキーマのアタッチ、スキーマの削除、メッセージの検証に使用するリビジョン範囲の更新を行うことができます。一般に、使用しているスキーマに対して変更を計画している場合は、新しいリビジョンを commit し、トピックに使用されるリビジョンの範囲を更新します。

トピックに関連付けられたスキーマを編集するには、Google Cloud コンソール、gcloud CLI、Pub/Sub API、または Cloud クライアント ライブラリを使用します。

Console

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. トピックの [トピック ID] をクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. スキーマには次の変更を加えることができます。

    変更が反映されるまでに数分かかることがあります。

    • スキーマをトピックから削除する場合は、[トピックの編集] ページで [スキーマを使用する] チェックボックスをオフにします。

    • スキーマを変更する場合は、[スキーマ] セクションでスキーマの名前を選択します。

    必要に応じて他のフィールドを更新します。

    • リビジョン範囲を更新する場合、[リビジョン範囲] で、[使用可能な最初のリビジョン] と [使用可能な最後のリビジョン] のプルダウン メニューを使用します。

    両方のフィールドを指定することも、1 つのみを指定することも、要件に基づいてデフォルト設定を保持することもできます。

  5. [更新] をクリックして変更を保存します。

gcloud

gcloud pubsub topics update TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_NAME \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

ここで

  • TOPIC_ID は、作成するトピックの ID です。
  • ENCODING_TYPE は、スキーマに対して検証されるメッセージのエンコードです。この値は JSON または BINARY に設定する必要があります。
  • SCHEMA_NAME は、既存のサブネットの名前です。
  • FIRST_REVISION_ID は、検証する最も古いリビジョンの ID です。
  • LAST_REVISION_ID は、検証する最新のリビジョンの ID です。

--first-revision-id--last-revision-id は省略可能です。

REST

トピックを更新するには、projects.topics.patch メソッドを使用します。

リクエスト:

リクエストは、Authorization ヘッダー内のアクセス トークンにより認証を受ける必要があります。現在のアプリケーションのデフォルト認証情報のアクセス トークンを取得する場合は、gcloud auth application-default print-access-token を使用します。

PATCH https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

リクエストの本文:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
    "update_mask":
  }
}

ここで

  • PROJECT_ID はプロジェクト ID です。
  • TOPIC_ID はトピック ID です。
  • SCHEMA_NAME は、公開されたメッセージが検証される必要があるスキーマの名前です。形式は projects/PROJECT_ID/schemas/SCHEMA_ID です。
  • ENCODING_TYPE は、スキーマに対して検証されるメッセージのエンコードです。JSON または BINARY に設定する必要があります。
  • FIRST_REVISION_ID は、検証する最も古いリビジョンの ID です。
  • LAST_REVISION_ID は、検証する最新のリビジョンの ID です。

firstRevisionIdlastRevisionId は省略可能です。

対応:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

firstRevisionIdlastRevisionId はどちらも、更新後には設定されません。

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& first_revision_id,
   std::string const& last_revision_id) {
  google::pubsub::v1::UpdateTopicRequest request;
  auto* request_topic = request.mutable_topic();
  request_topic->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  request_topic->mutable_schema_settings()->set_first_revision_id(
      first_revision_id);
  request_topic->mutable_schema_settings()->set_last_revision_id(
      last_revision_id);
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.first_revision_id";
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.last_revision_id";
  auto topic = client.UpdateTopic(request);

  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicSchema(w io.Writer, projectID, topicID, firstRevisionID, lastRevisionID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	t := client.Topic(topicID)

	// This updates the first / last revision ID for the topic's schema.
	// To clear the schema entirely, use a zero valued (empty) SchemaSettings.
	tc := pubsub.TopicConfigToUpdate{
		SchemaSettings: &pubsub.SchemaSettings{
			FirstRevisionID: firstRevisionID,
			LastRevisionID:  lastRevisionID,
		},
	}

	gotTopicCfg, err := t.Update(ctx, tc)
	if err != nil {
		fmt.Fprintf(w, "topic.Update err: %v\n", gotTopicCfg)
		return err
	}
	fmt.Fprintf(w, "Updated topic with schema: %#v\n", gotTopicCfg)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicSchemaExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing topic that has schema settings attached to it.
    String topicId = "your-topic-id";
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    UpdateTopicSchemaExample.updateTopicSchemaExample(
        projectId, topicId, firstRevisionId, lastRevisionId);
  }

  public static void updateTopicSchemaExample(
      String projectId, String topicId, String firstRevisionid, String lastRevisionId)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the schema settings with the changes you want to make.
      SchemaSettings schemaSettings =
          SchemaSettings.newBuilder()
              .setFirstRevisionId(firstRevisionid)
              .setLastRevisionId(lastRevisionId)
              .build();

      // Construct the topic with the schema settings you want to change.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setSchemaSettings(schemaSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder()
              .addPaths("schema_settings.first_revision_id")
              .addPaths("schema_settings.last_revision_id")
              .build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println("Updated topic with schema: " + topic.getName());
    }
  }
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.api_core.exceptions import InvalidArgument, NotFound
from google.cloud.pubsub import PublisherClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    response = publisher_client.update_topic(
        request={
            "topic": {
                "name": topic_path,
                "schema_settings": {
                    "first_revision_id": first_revision_id,
                    "last_revision_id": last_revision_id,
                },
            },
            "update_mask": "schemaSettings.firstRevisionId,schemaSettings.lastRevisionId",
        }
    )
    print(f"Updated a topic schema:\n{response}")

except NotFound:
    print(f"{topic_id} not found.")
except InvalidArgument:
    print("Schema settings are not valid.")
0

次のステップ