サーバーレス

BigQuery イベントで Cloud Run アクションをトリガーする方法

#da

※この投稿は米国時間 2021 年 2 月 11 日に、Google Cloud blog に投稿されたものの抄訳です。

多くの BigQuery ユーザーは、データベース トリガー、つまり特定の BigQuery テーブル、モデル、データセットのイベントに応答して手続き型コードを実行する方法を求めています。新しいテーブル パーティションが作成されるたびに ELT ジョブを実行する場合や、新しい行がテーブルに挿入されるたびに ML モデルを再トレーニングする場合があるかもしれません。

この記事では、「クラウドが簡単になる」という一般的なカテゴリにおいて、BigQuery と Cloud Run を簡単かつ適切に連携させる方法をご紹介します。BigQuery も Cloud Run もよく使うようであれば、一緒に使用することでさらに便利になるでしょう。

Cloud Run は、BigQuery が監査ログに書き込むときにトリガーされます。BigQuery のすべてのデータアクセスはログに記録されるため(オフにする方法はありません)、必要なのは、探している正確なログメッセージを見つけることだけです。

以下にその方法を説明していきます。

BigQuery イベントを探す

実際のデータセットが台無しになるのを恐れる方が多いと思いますので、ここでは、BigQuery のプロジェクトに cloud_run_tmp という名前の一時ファイルを作成します。

このプロジェクトで、テーブルを作成し、いくつかの行を挿入して試してみましょう。BigQuery パブリック データセットからいくつかの行を取得して、以下のようにテーブルを作成します。

  CREATE OR REPLACE TABLE cloud_run_tmp.cloud_run_trigger AS
SELECT 
  state, gender, year, name, number
FROM `bigquery-public-data.usa_names.usa_1910_current` 
LIMIT 10000

次に、データベース トリガーを作成する挿入クエリを実行します。

  INSERT INTO cloud_run_tmp.cloud_run_trigger
VALUES('OK', 'F', 2021, 'Joe', 3)

次に、別の Chrome タブで、このリンクをクリックして、Cloud Logging のBigQuery 監査イベントをフィルタリングします。

以下のイベントを発見しました。

Screen_Shot_2021-02-03

特定の BigQuery アクションには監査ログが複数あるので注意してください。たとえばこのケースでは、クエリを送信すると、すぐにログが生成されます。しかし、BigQuery はクエリが解析された後でのみインタラクションを行うテーブルを認識するため、初期ログにはテーブル名がありません。古い監査ログは不要です。アクションを明確に識別する一意の属性セットを確実に探してください。

行を挿入する場合の組み合わせは以下のとおりです。

●メソッドは google.cloud.bigquery.v2.JobService.InsertJob です

●挿入先のテーブルの名前は protoPayload.resourceName です

●データセット ID は resource.labels.dataset_id として利用できます

●挿入される行の数は protoPayload.metadata.tableDataChanged.insertedRowsCount です

Cloud Run アクションを記述する

求めているペイロードがわかったら、Cloud Run アクションを記述できます。Python で Flask アプリとして作成しましょう(完全なコードは GitHub にあります)。

まず、これが処理するイベントであることを確認します。

  @app.route('/', methods=['POST'])
def index():
    # Gets the Payload data from the Audit Log
    content = request.json
    try:
        ds = content['resource']['labels']['dataset_id']
        proj = content['resource']['labels']['project_id']
        tbl = content['protoPayload']['resourceName']
        rows = int(content['protoPayload']['metadata']
                   ['tableDataChange']['insertedRowsCount'])
        if ds == 'cloud_run_tmp' and \
           tbl.endswith('tables/cloud_run_trigger') and rows > 0:
            query = create_agg()
            return "table created", 200
    except:
        # if these fields are not in the JSON, ignore
        pass
    return "ok", 200

これが目的のイベントであることが確認できたら、実行するアクションを実行します。ここで、集計を行って新しいテーブルを書き出しましょう。

  def create_agg():
    client = bigquery.Client()
    query = """
CREATE OR REPLACE TABLE cloud_run_tmp.created_by_trigger AS
SELECT 
  name, SUM(number) AS n
FROM cloud_run_tmp.cloud_run_trigger
GROUP BY name
ORDER BY n desc
LIMIT 10
    """
    client.query(query)
    return query

コンテナの Dockerfile は、Flask と BigQuery クライアント ライブラリをインストールする基本的な Python コンテナです。

  FROM python:3.9-slim
RUN pip install Flask==1.1.2 gunicorn==20.0.4 google-cloud-bigquery
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY *.py ./
CMD exec gunicorn --bind :$PORT main:app

Cloud Run をデプロイする

コンテナをビルドして、いくつかの gcloud コマンドを使用してデプロイします。

  SERVICE=bq-cloud-run
PROJECT=$(gcloud config get-value project)
CONTAINER="gcr.io/${PROJECT}/${SERVICE}"
gcloud builds submit --tag ${CONTAINER}
gcloud run deploy ${SERVICE} --image $CONTAINER --platform managed

イベント トリガーをセットアップする

トリガーが機能するためには、Cloud Run のサービス アカウントにいくつかの権限が必要です。

  gcloud projects add-iam-policy-binding $PROJECT \
    --member="serviceAccount:service-${PROJECT_NO}@gcp-sa-pubsub.iam.gserviceaccount.com"\
    --role='roles/iam.serviceAccountTokenCreator'

gcloud projects add-iam-policy-binding $PROJECT \
    --member=serviceAccount:${SVC_ACCOUNT} \
    --role='roles/eventarc.admin'

最後に、イベント トリガーを作成します。

  gcloud eventarc triggers create ${SERVICE}-trigger \
  --location ${REGION} --service-account ${SVC_ACCOUNT} \
  --destination-run-service ${SERVICE}  \
  --event-filters type=google.cloud.audit.log.v1.written \
  --event-filters methodName=google.cloud.bigquery.v2.JobService.InsertJob \
  --event-filters serviceName=bigquery.googleapis.com

ここで重要なのは、BigQuery が作成した挿入ログによってトリガーしているということです。そのためアクションでは、ペイロードに基づいてこれらのイベントをフィルタリングする必要がありました。

どのイベントがサポートされているのでしょうか?Cloud Run のウェブ コンソールを確認するのが簡単です。以下に、その一部をご紹介します。

image_CvEAyeA

お試しください

ここでは BigQuery -> Cloud Run のトリガーとアクションを試します。BigQuery コンソールに移動し、1、2 行挿入します。

  INSERT INTO cloud_run_tmp.cloud_run_trigger
VALUES('OK', 'F', 2021, 'Joe', 3)

created_by_trigger という新しいテーブルが作成されます。これで BigQuery のデータベース イベントで Cloud Run アクションを正常にトリガーできました。

ぜひお試しください。

リソース

  1. すべてのコードは、手順付きの README とともに GitHub に掲載されています。

  2. 本ブログ投稿は、『BigQuery: The Definitive Guide』を更新したものです。私は、この本の内容を年に 1 回程度更新し、このようなブログの形で最新情報をお知らせすることを目標としています。

  3. 以前に投稿したこのような更新ブログは、この本の GitHub リポジトリからリンクされています。

Prashant Gulati に感謝します。

-Google Cloud 分析および AI ソリューション部門責任者 Lak Lakshmanan