.NET での Cloud Pub/Sub の使用

アプリの多くは、ウェブ リクエストのコンテキストの外部でバックグラウンド処理を行います。このサンプルでは、Bookshelf アプリが個別のバックグラウンド ワーカーにタスクを送信して実行します。ワーカーは Google Books API から情報を収集し、データベースの書籍情報を更新します。このサンプルでは、App Engine の個別サービスの設定方法、App Engine フレキシブル環境でワーカーの処理を実行する方法、ライフサイクル イベントへの対応方法を解説します。

このページは複数ページからなるチュートリアルの一部です。最初からの説明や設定手順を確認するには、.NET Bookshelf アプリに移動してください。

設定の構成

  1. Visual Studio でサンプルアプリを開くには、getting-started-dotnet\aspnet\5-pubsub ディレクトリで、5-pubsub をダブルクリックします。

  2. [ソリューション エクスプローラ] パネルで [Bookshelf] > [Web.config] の順にクリックします。

  3. bookshelf\Web.config で次の操作を行います。

    1. GoogleCloudSamples:ProjectId に、プロジェクト ID を設定します。

    2. GoogleCloudSamples:BookStore に、このチュートリアルの構造化データの使用で使用したのと同じ値を設定します。

    3. 構造化データの手順で Cloud SQL または SQL Server を使用した場合は、<connectionStrings> XML 要素を見つけ、その手順で使用したのと同じ値を connectionString に設定します。

    4. GoogleCloudSamples:BucketName に、前に作成した Cloud Storage バケットの名前を設定します。

  4. bookshelf\Web.config を保存して閉じます。

  5. [ソリューション エクスプローラ] パネルで [Worker] > [Web.config] に移動します。

  6. worker\Web.config で次の操作を行います。

    1. GoogleCloudSamples:ProjectId に、プロジェクト ID を設定します。

    2. GoogleCloudSamples:BookStore に、このチュートリアルの構造化データの使用で使用したのと同じ値を設定します。

    3. 構造化データの手順で Cloud SQL または SQL Server を使用した場合は、<connectionStrings> XML 要素を見つけ、その手順で使用したのと同じ値を connectionString に設定します。

    4. GoogleCloudSamples:BucketName に、前に作成した Cloud Storage バケットの名前を設定します。

  7. worker\Web.config を保存して閉じます。

ローカルマシンでのアプリの実行

  1. Visual Studio の [ソリューション エクスプローラ] パネルで、[ソリューション] を右クリックし、[スタートアップ プロジェクトの設定] を選択します。

    起動プロジェクトを設定する

  2. [マルチ スタートアップ プロジェクト] をクリックします。

  3. [Bookshelf] 行と [Worker] 行で [アクション] を [開始] に設定し、[OK] をクリックします。

    開始する本棚とワーカーを設定する

  4. F5 キーを押してプロジェクトを実行します。

  5. 本棚に書籍を何冊か追加します。アプリとワーカー インスタンスの両方がローカルで実行されている場合、ワーカーによるバックグラウンドでの書籍情報の更新を確認できます。

Bookshelf アプリを Compute Engine にデプロイする

  1. Visual Studio の [ソリューション エクスプローラ] パネルで、[Bookshelf] を右クリックし、[発行] をクリックします。

    アプリを公開

  2. このチュートリアルの Cloud Datastore の使用と同じ手順に従って、新しいカスタム プロファイルを作成します。

  3. [発行] をクリックします。

Compute Engine にワーカーをデプロイする

  1. Visual Studio の [ソリューション エクスプローラ] パネルで、[Worker] を右クリックし、[発行] をクリックします。

  2. このチュートリアルの Cloud Datastore の使用と同じ手順に従って、新しいカスタム プロファイルを作成します。

  3. [発行] をクリックします。

Compute Engine でアプリを実行する

ウェブブラウザに、最初の Compute Engine インスタンスのアドレスを入力します。

アプリの構造

この図は、アプリを構成するコンポーネントと、互いの関係を示しています。

Auth サンプルの構造

コードの説明

このセクションでは、キューを作成してそこにタスクを追加し、ワーカーを使用してキュー内のタスクを処理する手順を説明します。

キューを作成する

キューは、Cloud Pub/Sub のトピックとサブスクリプションから成ります。 キューを構成するトピックとサブスクリプションの図

QueueMessage には、Google Books API で検索する書籍の ID が格納されます。

private class QueueMessage
{
    public long BookId;
};

書籍 ID は book-process-queue という名前のトピックに追加されます。このトピックには、shared-worker-subscription という名前のサブスクリプションが登録されます。タスクを実行する際、ワーカーがこのサブスクリプションを監視します。

トピックとサブスクリプションのフルパスには、プロジェクト名が含まれます。

_topicName = new TopicName(projectId, options.TopicId);
_subscriptionName = new SubscriptionName(projectId, options.SubscriptionId);

CreateTopicAndSubscription() は Cloud Pub/Sub 内でトピックとサブスクリプションの作成を試みますが、その前に、既存のトピックとサブスクリプションの有無をチェックします。

public void CreateTopicAndSubscription()
{
    try
    {
        _pub.CreateTopic(_topicName);
        _logger.LogVerbose("Created topic " + _topicName);
    }
    catch (Grpc.Core.RpcException e)
    when (e.Status.StatusCode == Grpc.Core.StatusCode.AlreadyExists)
    {
        // The topic already exists.  Ok.
        _logger.LogError(_topicName + " already exists", e);
    }
    try
    {
        _sub.CreateSubscription(_subscriptionName, _topicName, null, 0);
        _logger.LogVerbose("Created subscription " + _subscriptionName);
    }
    catch (Grpc.Core.RpcException e)
    when (e.Status.StatusCode == Grpc.Core.StatusCode.AlreadyExists)
    {
        // The subscription already exists.  Ok.
        _logger.LogError(_subscriptionName + " already exists", e);
    }
}

タスクをキューに入れる

QueueMessage は JSON でエンコードされ、エンコードされた JSON は base64 でエンコードされます。単純な long には過剰なエンコードですが、メッセージに Cloud Pub/Sub API との互換性を持たせるには、この方法でメッセージをエンコードすることをおすすめします。

public void EnqueueBook(long bookId)
{
    var message = new QueueMessage() { BookId = bookId };
    var json = JsonConvert.SerializeObject(message);
    _pub.Publish(_topicName, new[] { new PubsubMessage()
    {
        Data = Google.Protobuf.ByteString.CopyFromUtf8(json)
    } });
}

ワーカー

ワーカーは、Cloud Pub/Sub イベントをリッスンする独立したアプリです。これにより、アプリを 2 つの独立したプロセスに分割し、両者を直接通信させずに Cloud Pub/Sub を介して通信させることができます。

書籍を処理する

書籍を処理するために、タスクは書籍の ID で書籍を取得し、追加情報を探してから、更新された情報をデータベースに保存します。

public void ProcessBook(IBookStore bookStore, long bookId)
{
    var book = bookStore.Read(bookId);
    _logger.LogVerbose($"Found {book.Title}.  Updating.");
    var query = "https://www.googleapis.com/books/v1/volumes?q="
        + Uri.EscapeDataString(book.Title);
    var response = WebRequest.Create(query).GetResponse();
    var reader = new StreamReader(response.GetResponseStream());
    var json = reader.ReadToEnd();
    UpdateBookFromJson(json, book);
    bookStore.Update(book);
}

関数 PullOnce はサブスクリプションからメッセージを読み取り、メッセージごとに ProcessBook を呼び出します。

        private void PullOnce(Action<long> callback, CancellationToken cancellationToken)
        {
            _logger.LogVerbose($"Pulling messages from {_subscriptionName}...");
            // Pull some messages from the subscription.

            var response = _sub.Pull(_subscriptionName, false, 3,
                CallSettings.FromCallTiming(
                    CallTiming.FromExpiration(
                        Expiration.FromTimeout(
                            TimeSpan.FromSeconds(90)))));
            if (response.ReceivedMessages == null)
            {
                // HTTP Request expired because the queue was empty.  Ok.
                _logger.LogVerbose("Pulled no messages.");
                return;
            }
            _logger.LogVerbose($"Pulled {response.ReceivedMessages.Count} messages.");
            if (response.ReceivedMessages.Count == 0)
            {
                return;
            }
            foreach (var message in response.ReceivedMessages)
            {
                try
                {
                    // Unpack the message.
                    byte[] json = message.Message.Data.ToByteArray();
                    var qmessage = JsonConvert.DeserializeObject<QueueMessage>(
                        Encoding.UTF8.GetString(json));
                    // Invoke ProcessBook().
                    callback(qmessage.BookId);
                }
                catch (Exception e)
                {
                    _logger.LogError("Error processing book.", e);
                }
            }
            // Acknowledge the message so we don't see it again.
            var ackIds = new string[response.ReceivedMessages.Count];
            for (int i = 0; i < response.ReceivedMessages.Count; ++i)
                ackIds[i] = response.ReceivedMessages[i].AckId;
            _sub.Acknowledge(_subscriptionName, ackIds);
        }
このページは役立ちましたか?評価をお願いいたします。