Cloud Run for Anthos で Pub/Sub を使用する

このチュートリアルでは、Cloud Run for Anthos サービスを作成して、デプロイし、Pub/Sub push サブスクリプションから呼び出す方法について説明します。

目標

  • サービスを作成、ビルドして、Cloud Run for Anthos にデプロイする
  • Pub/Sub トピックにメッセージを公開してサービスを呼び出す

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Cloud Run for Anthos API を有効にする
  7. gcloud CLI をインストールして初期化する
  8. kubectl コンポーネントをインストールします。
    gcloud components install kubectl
  9. コンポーネントを更新します。
    gcloud components update
  10. Cloud Run for Anthos を使用する場合は、Cloud Run for Anthos の設定の手順に沿って新しいクラスタを作成します。

gcloud のデフォルトを設定する

gcloud に Cloud Run for Anthos サービス用のデフォルトを構成するには、次のようにします。

  1. デフォルト プロジェクトを設定します。

    gcloud config set project PROJECT_ID

    PROJECT_ID は、このチュートリアルで使用するプロジェクトの名前に置き換えます。

  2. クラスタに gcloud を構成します。

    gcloud config set run/platform gke
    gcloud config set run/cluster CLUSTER-NAME
    gcloud config set run/cluster_location REGION

    以下のように置き換えます。

    • CLUSTER-NAME は、クラスタに対して使用した名前に置き換えます。
    • REGION は、選択したサポート対象のクラスタの場所に置き換えます。

Pub/Sub トピックを作成する

このサンプル サービスは Pub/Sub トピックに公開されたメッセージによってトリガーされるため、Pub/Sub でトピックを作成する必要があります。

新しい Pub/Sub トピックを作成するには、次のコマンドを使用します。

gcloud pubsub topics create myRunTopic

myRunTopic を使用するか、Cloud プロジェクト内で一意のトピック名に置き換えます。

コードサンプルを取得する

使用するサンプルコードを取得するには:

  1. ローカルマシンにサンプルアプリのリポジトリのクローンを作成します。

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    また、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

    Go

    git clone https://github.com/GoogleCloudPlatform/golang-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

  2. Cloud Run for Anthos のサンプルコードが含まれているディレクトリに移動します。

    Node.js

    cd nodejs-docs-samples/run/pubsub/

    Python

    cd python-docs-samples/run/pubsub/

    Go

    cd golang-samples/run/pubsub/

    Java

    cd java-docs-samples/run/pubsub/

コードを確認する

このチュートリアルのコードは、次のものから構成されています。

  • 受信リクエストを処理するサーバー。

    Node.js

    Node.js サービスをテストしやすくするため、サーバー構成はサーバーの起動とは別になっています。

    Node.js ウェブサーバーは、app.js 内で設定されています。

    const express = require('express');
    const app = express();
    
    // This middleware is available in Express v4.16.0 onwards
    app.use(express.json());
    ウェブサーバーは index.js で開始します。
    const app = require('./app.js');
    const PORT = parseInt(parseInt(process.env.PORT)) || 8080;
    
    app.listen(PORT, () =>
      console.log(`nodejs-pubsub-tutorial listening on port ${PORT}`)
    );

    Python

    import base64
    import os
    
    from flask import Flask, request
    
    app = Flask(__name__)

    Go

    
    // Sample run-pubsub is a Cloud Run service which handles Pub/Sub messages.
    package main
    
    import (
    	"encoding/json"
    	"io/ioutil"
    	"log"
    	"net/http"
    	"os"
    )
    
    func main() {
    	http.HandleFunc("/", HelloPubSub)
    	// Determine port for HTTP service.
    	port := os.Getenv("PORT")
    	if port == "" {
    		port = "8080"
    		log.Printf("Defaulting to port %s", port)
    	}
    	// Start HTTP server.
    	log.Printf("Listening on port %s", port)
    	if err := http.ListenAndServe(":"+port, nil); err != nil {
    		log.Fatal(err)
    	}
    }
    

    Java

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class PubSubApplication {
      public static void main(String[] args) {
        SpringApplication.run(PubSubApplication.class, args);
      }
    }

  • Pub/Sub メッセージを処理し、応答メッセージをログに記録するハンドラ。

    Node.js

    app.post('/', (req, res) => {
      if (!req.body) {
        const msg = 'no Pub/Sub message received';
        console.error(`error: ${msg}`);
        res.status(400).send(`Bad Request: ${msg}`);
        return;
      }
      if (!req.body.message) {
        const msg = 'invalid Pub/Sub message format';
        console.error(`error: ${msg}`);
        res.status(400).send(`Bad Request: ${msg}`);
        return;
      }
    
      const pubSubMessage = req.body.message;
      const name = pubSubMessage.data
        ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
        : 'World';
    
      console.log(`Hello ${name}!`);
      res.status(204).send();
    });

    Python

    @app.route("/", methods=["POST"])
    def index():
        envelope = request.get_json()
        if not envelope:
            msg = "no Pub/Sub message received"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
    
        if not isinstance(envelope, dict) or "message" not in envelope:
            msg = "invalid Pub/Sub message format"
            print(f"error: {msg}")
            return f"Bad Request: {msg}", 400
    
        pubsub_message = envelope["message"]
    
        name = "World"
        if isinstance(pubsub_message, dict) and "data" in pubsub_message:
            name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()
    
        print(f"Hello {name}!")
    
        return ("", 204)
    
    

    Go

    
    // PubSubMessage is the payload of a Pub/Sub event.
    // See the documentation for more details:
    // https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
    type PubSubMessage struct {
    	Message struct {
    		Data []byte `json:"data,omitempty"`
    		ID   string `json:"id"`
    	} `json:"message"`
    	Subscription string `json:"subscription"`
    }
    
    // HelloPubSub receives and processes a Pub/Sub push message.
    func HelloPubSub(w http.ResponseWriter, r *http.Request) {
    	var m PubSubMessage
    	body, err := ioutil.ReadAll(r.Body)
    	if err != nil {
    		log.Printf("ioutil.ReadAll: %v", err)
    		http.Error(w, "Bad Request", http.StatusBadRequest)
    		return
    	}
    	// byte slice unmarshalling handles base64 decoding.
    	if err := json.Unmarshal(body, &m); err != nil {
    		log.Printf("json.Unmarshal: %v", err)
    		http.Error(w, "Bad Request", http.StatusBadRequest)
    		return
    	}
    
    	name := string(m.Message.Data)
    	if name == "" {
    		name = "World"
    	}
    	log.Printf("Hello %s!", name)
    }
    

    Java

    import com.example.cloudrun.Body;
    import java.util.Base64;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    // PubsubController consumes a Pub/Sub message.
    @RestController
    public class PubSubController {
      @RequestMapping(value = "/", method = RequestMethod.POST)
      public ResponseEntity receiveMessage(@RequestBody Body body) {
        // Get PubSub message from request body.
        Body.Message message = body.getMessage();
        if (message == null) {
          String msg = "Bad Request: invalid Pub/Sub message format";
          System.out.println(msg);
          return new ResponseEntity(msg, HttpStatus.BAD_REQUEST);
        }
    
        String data = message.getData();
        String target =
            !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
        String msg = "Hello " + target + "!";
    
        System.out.println(msg);
        return new ResponseEntity(msg, HttpStatus.OK);
      }
    }

    正確な HTTP レスポンス コードを返すようにサービスをコーディングする必要があります。HTTP 200204 などの成功コードは、Pub/Sub メッセージの処理の完了を意味します。HTTP 400500 などのエラーコードは、push を使用したメッセージの受信で説明されているように、メッセージが再試行されることを示します。

  • サービスの動作環境を定義する DockerfileDockerfile の内容は言語によって異なります。

    Node.js

    
    # Use the official lightweight Node.js 12 image.
    # https://hub.docker.com/_/node
    FROM node:18-slim
    
    # Create and change to the app directory.
    WORKDIR /usr/src/app
    
    # Copy application dependency manifests to the container image.
    # A wildcard is used to ensure both package.json AND package-lock.json are copied.
    # Copying this separately prevents re-running npm install on every code change.
    COPY package*.json ./
    
    # Install dependencies.
    # If you add a package-lock.json speed your build by switching to 'npm ci'.
    # RUN npm ci --only=production
    RUN npm install --production
    
    # Copy local code to the container image.
    COPY . .
    
    # Run the web service on container startup.
    CMD [ "npm", "start" ]
    

    Python

    
    # Use the official Python image.
    # https://hub.docker.com/_/python
    FROM python:3.10
    
    # Allow statements and log messages to immediately appear in the Cloud Run logs
    ENV PYTHONUNBUFFERED True
    
    # Copy application dependency manifests to the container image.
    # Copying this separately prevents re-running pip install on every code change.
    COPY requirements.txt ./
    
    # Install production dependencies.
    RUN pip install -r requirements.txt
    
    # Copy local code to the container image.
    ENV APP_HOME /app
    WORKDIR $APP_HOME
    COPY . ./
    
    # Run the web service on container startup.
    # Use gunicorn webserver with one worker process and 8 threads.
    # For environments with multiple CPU cores, increase the number of workers
    # to be equal to the cores available.
    # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
    CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
    

    Go

    
    # Use the offical golang image to create a binary.
    # This is based on Debian and sets the GOPATH to /go.
    # https://hub.docker.com/_/golang
    FROM golang:1.17-buster as builder
    
    # Create and change to the app directory.
    WORKDIR /app
    
    # Retrieve application dependencies.
    # This allows the container build to reuse cached dependencies.
    # Expecting to copy go.mod and if present go.sum.
    COPY go.* ./
    RUN go mod download
    
    # Copy local code to the container image.
    COPY . ./
    
    # Build the binary.
    RUN go build -v -o server
    
    # Use the official Debian slim image for a lean production container.
    # https://hub.docker.com/_/debian
    # https://docs.docker.com/develop/develop-images/multistage-build/#use-multi-stage-builds
    FROM debian:buster-slim
    RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
        ca-certificates && \
        rm -rf /var/lib/apt/lists/*
    
    # Copy the binary to the production image from the builder stage.
    COPY --from=builder /app/server /server
    
    # Run the web service on container startup.
    CMD ["/server"]
    

    Java

    このサンプルでは、Jib を使用して一般的な Java ツールにより Docker イメージをビルドします。Jib は、Dockerfile や Docker をインストールせずにコンテナのビルドを最適化します。Jib を使用して Java コンテナを構築する方法の詳細を確認します。
    <plugin>
      <groupId>com.google.cloud.tools</groupId>
      <artifactId>jib-maven-plugin</artifactId>
      <version>3.2.1</version>
      <configuration>
        <to>
          <image>gcr.io/PROJECT_ID/pubsub</image>
        </to>
      </configuration>
    </plugin>
    

Pub/Sub リクエストの送信元を認証する方法の詳細は、以下の Pub/Sub との統合をご覧ください。

コードの配布

コードの配布は、Cloud Build でコンテナ イメージをビルドする、Container Registry にコンテナ イメージをアップロードする、Cloud Run にコンテナ イメージをデプロイするという 3 ステップで構成されます。

コードを配布するには:

  1. コンテナをビルドして、Container Registry に公開します。

    Node.js

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    PROJECT_ID は Cloud プロジェクト ID、pubsub はサービスに付ける名前です。

    ビルドが成功すると、ID、作成時間、イメージ名を含む SUCCESS メッセージが表示されます。イメージが Container Registry に保存されます。このイメージは必要に応じて再利用できます。

    Python

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    PROJECT_ID は Cloud プロジェクト ID、pubsub はサービスに付ける名前です。

    ビルドが成功すると、ID、作成時間、イメージ名を含む SUCCESS メッセージが表示されます。イメージが Container Registry に保存されます。このイメージは必要に応じて再利用できます。

    Go

    gcloud builds submit --tag gcr.io/PROJECT_ID/pubsub

    PROJECT_ID は Cloud プロジェクト ID、pubsub はサービスに付ける名前です。

    ビルドが成功すると、ID、作成時間、イメージ名を含む SUCCESS メッセージが表示されます。イメージが Container Registry に保存されます。このイメージは必要に応じて再利用できます。

    Java

    mvn compile jib:build -Dimage=gcr.io/PROJECT_ID/pubsub

    PROJECT_ID は Cloud プロジェクト ID、pubsub はサービスに付ける名前です。

    ビルドが成功すると、BUILD SUCCESS メッセージが表示されます。イメージが Container Registry に保存されます。このイメージは必要に応じて再利用できます。

  2. 次のコマンドを実行して、アプリをデプロイします。

    gcloud run deploy pubsub-tutorial --image gcr.io/PROJECT_ID/pubsub

    PROJECT_ID を Cloud プロジェクト ID に置き換えます。pubsub はコンテナ名、pubsub-tutorial はサービスの名前です。コンテナ イメージは、gcloud の設定で構成したサービスとクラスタにデプロイされることに注意してください。

    デプロイが完了するまで待ちます。30 秒ほどかかる場合があります。成功すると、コマンドラインにサービス URL が表示されます。この URL は、Pub/Sub サブスクリプションの構成で使用します。

  3. サービスにコードの更新をデプロイする場合は、上記のステップを繰り返します。サービスをデプロイするたびに、リビジョンが作成されます。準備ができると、トラフィックの送信が自動的に開始します。

Pub/Sub との統合

Cloud Run for Anthos サービスをデプロイしたので、メッセージを push するように Pub/Sub を構成します。

Pub/Sub とサービスを統合するには:

  1. プロジェクトで Pub/Sub 認証トークンを作成できるようにします。

    gcloud projects add-iam-policy-binding PROJECT_ID \
         --member=serviceAccount:service-PROJECT-NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
         --role=roles/iam.serviceAccountTokenCreator

    次のように置き換えます。

    • PROJECT_ID を Cloud プロジェクト ID に置き換えます。
    • PROJECT-NUMBER を Cloud プロジェクト番号に置き換えます。
  2. Pub/Sub サブスクリプションの ID を表すサービス アカウントを作成または選択します。

    gcloud iam service-accounts create cloud-run-pubsub-invoker \
         --display-name "Cloud Run for Anthos Pub/Sub Invoker"

    cloud-run-pubsub-invoker を使用するか、Cloud プロジェクト内で一意の名前に置き換えます。

  3. このサービス アカウントを使用して Pub/Sub サブスクリプションを作成します。

    1. クラスタで自動 TLS と HTTPS を有効にし、サービスにドメイン マッピングを追加します。

    2. Pub/Sub のドメイン所有者を登録します。

    3. Pub/Sub メッセージに添付された認証トークンを検証するコードを追加します。サンプルコードは、push エンドポイントによる認証と認可で提供されています。

      認証では、トークンが有効であり、予想されるサービス アカウントに関連付けられている必要があります。Cloud Run とは異なり、Cloud Run for Anthos には、トークンが有効であることや、サービス アカウントに Cloud Run for Anthos サービスを起動する権限があることを示す組み込みの認可チェックがありません。

    4. このサービス アカウントを使用して Pub/Sub サブスクリプションを作成します。

      gcloud pubsub subscriptions create myRunSubscription --topic myRunTopic \
           --push-endpoint=SERVICE-URL/ \
           --push-auth-service-account=cloud-run-pubsub-invoker@PROJECT_ID.iam.gserviceaccount.com

      次のように置き換えます。

      • myRunTopic は、以前に作成したトピックに置き換えます。
      • SERVICE-URL は、カスタム サービス URL に置き換えます。プロトコルとして https を指定します。
      • PROJECT_ID を Cloud プロジェクト ID に置き換えます。

      --push-auth-service-account フラグは、認証と認可のために Pub/Sub の push 機能を有効にします。

これで、サービスが Pub/Sub と完全に統合されました。

試してみる

エンドツーエンドのソリューションをテストするには、次の手順を行います。

  1. トピックに Pub/Sub メッセージを送信します。

    gcloud pubsub topics publish myRunTopic --message "Runner"

    このチュートリアルで説明したコマンドラインを使用する代わりに、プログラムでメッセージを発行することもできます。詳しくは、メッセージの公開をご覧ください。

  2. サービスログに移動します。

    1. Google Cloud コンソールで Cloud Run for Anthos ページに移動します。

      Cloud Run for Anthos に移動

    2. pubsub-tutorial サービスをクリックします。

    3. [ログ] タブを選択します。

      ログが表示されるまで少し時間がかかることがあります。すぐに表示されない場合は、しばらくしてからもう一度確認してください。

  3. 「Hello Runner!」というメッセージを見つけます。

クリーンアップ

Pub/Sub で Cloud Run for Anthos を使用する場合のさらに詳しいユースケースのチュートリアルを行うには、クリーンアップをスキップして、画像の処理のチュートリアルに進んでください。

このチュートリアル用に新規プロジェクトを作成した場合は、そのプロジェクトを削除します。既存のプロジェクトを使用し、このチュートリアルで変更を加えずに残す場合は、チュートリアル用に作成したリソースを削除します。

プロジェクトを削除する

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

チュートリアル リソースを削除する

  1. このチュートリアルでデプロイした Cloud Run for Anthos サービスを削除します。

    gcloud run services delete SERVICE-NAME

    SERVICE-NAME は、選択したサービス名です。

    Google Cloud コンソールから Cloud Run for Anthos のサービスを削除することもできます。

    Cloud Run for Anthos に移動

  2. チュートリアルを設定したときに追加した gcloud のデフォルト構成を削除します。

     gcloud config unset run/platform
     gcloud config unset run/cluster
     gcloud config unset run/cluster_location
    
  3. プロジェクト構成を削除します。

     gcloud config unset project
    
  4. このチュートリアルで作成した他の Google Cloud リソースを削除します。

次のステップ