BigQuery サブスクリプションの作成

このドキュメントでは、BigQuery サブスクリプションの作成方法について説明します。BigQuery サブスクリプションを作成するには、Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用できます。

準備

このドキュメントを読む前に、次の内容をよく理解しておいてください。

BigQuery サブスクリプションを作成する前に、Pub/Sub と BigQuery に精通しているだけでなく、次の前提条件を満たしていることを確認してください。

  • BigQuery テーブルが存在している。あるいは、このドキュメントの後半のセクションで説明するように、BigQuery サブスクリプションの作成時に作成することもできます。

  • Pub/Sub トピックのスキーマと BigQuery テーブルの間の互換性。互換性のない BigQuery テーブルを追加すると、互換性に関連するエラー メッセージが出力されます。詳細については、スキーマの互換性をご覧ください。

必要なロールと権限

ロールと権限に関するガイドラインは次の一覧に示すとおりです。

  • サブスクリプションを作成するには、プロジェクト レベルでアクセス制御を構成する必要があります。

  • このセクションで後述するように、サブスクリプションとトピックが異なるプロジェクトにある場合は、リソースレベルの権限も必要です。

  • BigQuery サブスクリプションを作成するには、Pub/Sub サービス アカウントに特定の BigQuery テーブルへの書き込み権限が付与されている必要があります。これらの権限を付与する方法について詳しくは、このドキュメントの次のセクションをご覧ください。

  • プロジェクト内の BigQuery サブスクリプションを構成して、別のプロジェクトの BigQuery テーブルに書き込むことができます。

BigQuery サブスクリプションの作成に必要な権限を取得するには、管理者にプロジェクトに対する Pub/Sub 編集者roles/pubsub.editor)IAM ロールを付与するよう依頼してください。ロールの付与の詳細については、アクセス権の管理をご覧ください。

この事前定義ロールには、BigQuery サブスクリプションの作成に必要な権限が含まれています。必要な権限を正確に確認するには、[必要な権限] セクションを開いてください。

必要な権限

BigQuery サブスクリプションを作成するには、次の権限が必要です。

  • サブスクリプションから pull する: pubsub.subscriptions.consume
  • サブスクリプションを作成する: pubsub.subscriptions.create
  • サブスクリプションを削除する: pubsub.subscriptions.delete
  • サブスクリプションを取得する: pubsub.subscriptions.get
  • サブスクリプションを一覧表示する: pubsub.subscriptions.list
  • サブスクリプションを更新する: pubsub.subscriptions.update
  • サブスクリプションをトピックに接続する: pubsub.topics.attachSubscription
  • サブスクリプションの IAM ポリシーを取得する: pubsub.subscriptions.getIamPolicy
  • サブスクリプションの IAM ポリシーを構成する: pubsub.subscriptions.setIamPolicy

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

別のプロジェクトのトピックに関連付けられているプロジェクトで BigQuery サブスクリプションを作成する必要がある場合は、トピック管理者にトピックに対する Pub/Sub 編集者の (roles/pubsub.editor) IAM ロールも付与するよう依頼してください。

Pub/Sub サービス アカウントに BigQuery のロールを割り当てる

一部の Google Cloud サービスでは、ユーザーのリソースにアクセスするために、Google Cloud 管理のサービス アカウントを使用しています。これらのサービス アカウントは、サービス エージェントと呼ばれます。Pub/Sub は、プロジェクトごとに service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com 形式のサービス アカウントを作成し、維持します。

BigQuery サブスクリプションを作成するには、Pub/Sub サービス アカウントに、特定の BigQuery テーブルへの書き込みとテーブル メタデータの読み取りを行う権限が必要です。

Pub/Sub サービス アカウントに BigQuery データ編集者(roles/bigquery.dataEditor)ロールを付与します。

  1. Google Cloud コンソールの [IAM] ページに移動します。

    IAM に移動

  2. [アクセス権を付与] をクリックします。

  3. [プリンシパルを追加] セクションに、Pub/Sub サービス アカウントの名前を入力します。サービス アカウントの形式は service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com です。たとえば、project-number=112233445566 を使用するプロジェクトの場合、サービス アカウントの形式は service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com です。

  4. [ロールの割り当て] セクションで、[別のロールを追加] をクリックします。

  5. [ロールを選択] プルダウンに「BigQuery」と入力し、[BigQuery データ編集者のロール] を選択します。

  6. [保存] をクリックします。

BigQuery IAM の詳細については、BigQuery のロールと権限をご覧ください。

BigQuery のサブスクリプション プロパティ

BigQuery のサブスクリプションを構成するときに、次のプロパティを指定できます。

共通の特徴

すべてのサブスクリプションで設定できる共通のサブスクリプション プロパティについて学習します。

トピック スキーマを使用する

このオプションにより、Pub/Sub は、サブスクリプションが接続されている Pub/Sub トピックのスキーマを使用できます。さらに、Pub/Sub はメッセージ内のフィールドを BigQuery テーブル内の対応する列に書き込みます。

このオプションを使用する場合は、以下の追加要件を確認してください。

  • トピック スキーマ内のフィールドと BigQuery スキーマ内のフィールドは、同じ名前でなければならず、その型には相互に互換性が必要です。

  • トピック スキーマのオプション フィールドは BigQuery スキーマでも省略可能な必要があります。

  • トピック スキーマの必須フィールドは、BigQuery スキーマでは必要ありません。

  • BigQuery フィールドがトピック スキーマに存在しない場合、これらの BigQuery フィールドは NULLABLE モードにする必要があります。

  • トピック スキーマに、BigQuery スキーマに存在しない追加のフィールドがあり、このようなフィールドを削除できる場合は、[不明な項目を削除する] オプションを選択します。

  • サブスクリプション プロパティとして [トピック スキーマを使用する] または [テーブル スキーマを使用する] のいずれか 1 つのみを選択できます。

[トピック スキーマを使用する] または [テーブル スキーマを使用する] オプションを選択しない場合は、BigQuery テーブルに、BYTESSTRING、または JSON 型の data という列があることを確認してください。Pub/Sub は、この BigQuery 列にメッセージを書き込みます。

Pub/Sub トピック スキーマまたは BigQuery テーブル スキーマの変更は、BigQuery テーブルに書き込まれたメッセージにすぐに反映されない場合があります。たとえば、[不明なフィールドを削除する] オプションが有効で、フィールドが Pub/Sub スキーマには存在するが BigQuery スキーマにはない場合、BigQuery テーブルに書き込まれるメッセージには、BigQuery スキーマに追加した後も、フィールドが含まれていない場合があります。最終的に、スキーマが同期され、後続のメッセージにこのフィールドが含まれます。

BigQuery サブスクリプションで [テーブル スキーマを使用する] オプションを使用すると、BigQuery 変更データ キャプチャ(CDC)も利用できます。CDC は、既存の行を処理して変更を適用し、BigQuery テーブルを更新します。

この機能の詳細については、変更データ キャプチャを使用してテーブル更新をストリーミングするをご覧ください。

BigQuery サブスクリプションでこの機能を使用する方法については、BigQuery の変更データ キャプチャをご覧ください。

テーブル スキーマを使用する

このオプションにより、Pub/Sub は BigQuery テーブルのスキーマを使用して、JSON メッセージのフィールドを対応する列に書き込むことができます。このオプションを使用する場合は、次の追加要件を確認してください。

  • 公開されるメッセージは JSON 形式である必要があります。

  • サブスクリプションのトピックにスキーマが関連付けられている場合は、メッセージ エンコード プロパティを JSON に設定する必要があります。

  • メッセージに存在しない BigQuery フィールドがある場合、これらの BigQuery フィールドは NULLABLE モードにする必要があります。

  • BigQuery スキーマに存在しない追加のフィールドがメッセージにあり、このようなフィールドを削除できる場合は、[不明なフィールドを削除する] オプションを選択します。

  • JSON メッセージでは、DATEDATETIMETIMETIMESTAMP の値は、サポートされている表現に準拠する整数でなければなりません。

  • JSON メッセージでは、NUMERICBIGNUMERIC の値は、BigDecimalByteStringEncoder を使用してバイト エンコードする必要があります。

  • サブスクリプション プロパティとして [トピック スキーマを使用する] または [テーブル スキーマを使用する] のいずれか 1 つのみを選択できます。

[トピック スキーマを使用する] または [テーブル スキーマを使用する] オプションを選択しない場合は、BigQuery テーブルに、BYTESSTRING、または JSON 型の data という列があることを確認してください。Pub/Sub は、この BigQuery 列にメッセージを書き込みます。

BigQuery テーブル スキーマの変更は、BigQuery テーブルに書き込まれたメッセージにすぐに反映されない場合があります。 たとえば、[不明なフィールドを削除する] オプションが有効で、フィールドがメッセージには存在するが BigQuery スキーマにはない場合、BigQuery テーブルに書き込まれるメッセージには、BigQuery スキーマに追加した後も、フィールドが含まれていない場合があります。最終的に、スキーマが同期され、後続のメッセージにこのフィールドが含まれます。

BigQuery サブスクリプションで [テーブル スキーマを使用する] オプションを使用すると、BigQuery 変更データ キャプチャ(CDC)も利用できます。 CDC は、既存の行を処理して変更を適用し、BigQuery テーブルを更新します。

この機能の詳細については、変更データ キャプチャを使用してテーブル更新をストリーミングするをご覧ください。

BigQuery サブスクリプションでこの機能を使用する方法については、BigQuery 変更データ キャプチャをご覧ください。

不明な項目を削除する

このオプションは、[トピック スキーマを使用する] または [テーブル スキーマを使用する] オプションで使用します。このオプションを使用すると、Pub/Sub は、トピック スキーマやメッセージには存在するが BigQuery スキーマには存在しないフィールドをドロップできます。[不明な項目を削除する] が設定されていない場合、余分な項目を含むメッセージは BigQuery に書き込まれず、サブスクリプション バックログに残ります。サブスクリプションがエラー状態になります。

メタデータを書き込む

このオプションを使用すると、Pub/Sub が各メッセージのメタデータを BigQuery テーブルの追加の列に書き込むようにする場合は、このオプションを選択できます。それ以外の場合、メタデータは BigQuery テーブルに書き込まれません。

[メタデータを書き込む] オプションを選択する場合は、BigQuery テーブルに次の表に示すフィールドがあることを確認してください。

メタデータを書き込むオプションを選択しない場合、use_topic_schema が true でない限り、宛先 BigQuery テーブルでは data フィールドのみが必要になります。[メタデータを書き込む] と [トピック スキーマを使用する] の両方のオプションを選択した場合、トピックのスキーマには、次の名前にメタデータ パラメータと一致する名前のフィールドを含めないでください。この制限には、スネークケース パラメータのキャメルケース バージョンが含まれます。

パラメータ
subscription_name

STRING

サブスクリプションの名前。

message_id

STRING

メッセージの ID

publish_time

TIMESTAMP

メッセージのパブリッシュ時刻。

data

BYTES、STRING、または JSON

メッセージの本文。

data フィールドは、[トピック スキーマを使用する] を選択しないすべての宛先 BigQuery テーブルに必要です。フィールドの型が JSON の場合、メッセージ本文を有効な JSON にする必要があります。

attributes

STRING または JSON

すべてのメッセージ属性を含む JSON オブジェクト。また、存在する場合、順序指定キーなど、Pub/Sub メッセージの一部である追加のフィールドも含まれています。

BigQuery サブスクリプションの作成

次のサンプルでは、BigQuery 配信でサブスクリプションを作成する方法を示します。

コンソール

  1. Google Cloud コンソールで、[サブスクリプション] ページに移動します。

    サブスクリプションに移動

  2. [サブスクリプションを作成] をクリックします。
  3. [サブスクリプション ID] フィールドに名前を入力します。

    サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

  4. プルダウン メニューからトピックを選択するか、作成します。サブスクリプションがトピックからメッセージを受信します。
  5. [BigQuery への書き込み] として [配信タイプ] を選択します。
  6. BigQuery テーブルのプロジェクトを選択します。
  7. 既存のデータセットを選択するか、新規に作成します。

    データセットの作成方法については、データセットの作成をご覧ください。

  8. 既存のテーブルを選択するか、新規に作成します。

    テーブルの作成方法については、テーブルの作成をご覧ください。

  9. メッセージ エラーを処理するには、デッドレターを有効にすることを強くおすすめします。

    詳細については、デッドレター トピックをご覧ください。

  10. [作成] をクリックします。

[トピック] ページからサブスクリプションを作成することもできます。 このショートカットは、トピックとサブスクリプションの関連付けに使用できます。

  1. Google Cloud コンソールで [トピック] ページに移動します。

    トピックに移動

  2. サブスクリプションを作成するトピックの横にある をクリックします。
  3. コンテキスト メニューから [サブスクリプションを作成] を選択します。
  4. [BigQuery への書き込み] として [配信タイプ] を選択します。
  5. BigQuery テーブルのプロジェクトを選択します。
  6. 既存のデータセットを選択するか、新規に作成します。

    データセットの作成方法については、データセットの作成をご覧ください。

  7. 既存のテーブルを選択するか、新規に作成します。

    データセットの作成方法については、テーブルの作成をご覧ください。

  8. メッセージ エラーを処理するには、デッドレターを有効にすることを強くおすすめします。

    詳細については、デッドレター トピックをご覧ください。

  9. [作成] をクリックします。

gcloud

  1. Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部で Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  2. Pub/Sub サブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドを使用します。

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID:DATASET_ID.TABLE_ID

    以下を置き換えます。

    • SUBSCRIPTION_ID: サブスクリプションの ID を指定します。
    • TOPIC_ID: トピックの ID を指定します。このトピックでは、スキーマが必要です。
    • PROJECT_ID: プロジェクトの ID を指定します。
    • DATASET_ID: 既存のデータセットの ID を指定します。データセットを作成するには、データセットの作成をご覧ください。
    • TABLE_ID: 既存のテーブルの ID を指定します。 トピックにスキーマがない場合、このテーブルには data フィールドが必要です。テーブルを作成するには、スキーマ定義を含む空のテーブルを作成するをご覧ください。

C++

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& table_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_bigquery_config()->set_table(table_id);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C# 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。


using Google.Cloud.PubSub.V1;

public class CreateBigQuerySubscriptionSample
{
    public Subscription CreateBigQuerySubscription(string projectId, string topicId, string subscriptionId, string bigqueryTableId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            BigqueryConfig = new BigQueryConfig
            {
                Table = bigqueryTableId
            }
        };
        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        return subscription;
    }
}

Go

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// createBigQuerySubscription creates a Pub/Sub subscription that exports messages to BigQuery.
func createBigQuerySubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, table string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// table := "my-project-id.dataset_id.table_id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic: topic,
		BigQueryConfig: pubsub.BigQueryConfig{
			Table:         table,
			WriteMetadata: true,
		},
	})
	if err != nil {
		return fmt.Errorf("client.CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created BigQuery subscription: %v\n", sub)

	return nil
}

Java

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.BigQueryConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateBigQuerySubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bigqueryTableId = "your-project.your-dataset.your-table";

    createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId);
  }

  public static void createBigQuerySubscription(
      String projectId, String topicId, String subscriptionId, String bigqueryTableId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      BigQueryConfig bigqueryConfig =
          BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setBigqueryConfig(bigqueryConfig)
                  .build());

      System.out.println("Created a BigQuery subscription: " + subscription.getAllFields());
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId,
  subscriptionNameOrId,
  bigqueryTableId
) {
  const options = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  bigqueryTableId: string
) {
  const options: CreateSubscriptionOptions = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

PHP

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある PHP 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\BigQueryConfig;

/**
 * Creates a Pub/Sub BigQuery subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $table      The BigQuery table to which to write.
 */
function create_bigquery_subscription($projectId, $topicName, $subscriptionName, $table)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = new BigQueryConfig(['table' => $table]);
    $subscription->create([
        'bigqueryConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "your-project.your-dataset.your-table"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")

Ruby

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Ruby 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

require "google/cloud/pubsub"

##
# Shows how to create a BigQuery subscription where messages published
# to a topic populates a BigQuery table.
#
# @param project_id [String]
# Your Google Cloud project (e.g. "my-project")
# @param topic_id [String]
# Your topic name (e.g. "my-secret")
# @param subscription_id [String]
# ID for new subscription to be created (e.g. "my-subscription")
# @param bigquery_table_id [String]
# ID of bigquery table (e.g "my-project:dataset-id.table-id")
#
def pubsub_create_bigquery_subscription project_id:, topic_id:, subscription_id:, bigquery_table_id:
  pubsub = Google::Cloud::Pubsub.new project_id: project_id
  topic = pubsub.topic topic_id
  subscription = topic.subscribe subscription_id,
                                 bigquery_config: {
                                   table: bigquery_table_id,
                                   write_metadata: true
                                 }
  puts "BigQuery subscription created: #{subscription_id}."
  puts "Table for subscription is: #{bigquery_table_id}"
end

次のステップ