Cloud Run functions の関数で Looker アクションを使用して BigQuery に書き戻す

Looker の多くのお客様は、データ ウェアハウス内のデータのレポート作成だけでなく、データ ウェアハウスへの書き戻しや更新もユーザーが行えるようにしたいと考えています。

Looker は、Action API を通じて、任意のデータ ウェアハウスまたは宛先でこのユースケースをサポートしています。このドキュメント ページでは、 Google Cloud インフラストラクチャを使用しているお客様が Cloud Run functions にソリューションをデプロイして BigQuery に書き戻す方法について説明します。このページでは、次のトピックについて説明します。

ソリューションの考慮事項

この考慮事項のリストを使用して、このソリューションがニーズに合っていることを確認してください。

  • Cloud Run functions
    • Cloud Run functions を使用する理由Google の「サーバーレス」サービスである Cloud Run functions は、運用とメンテナンスの容易さの点で優れた選択肢です。注意すべき点として、レイテンシ(特にコールド呼び出しの場合)は、専用サーバーに依存するソリューションよりも長くなる可能性があります。
    • 言語とランタイム: Cloud Run functions は、複数の言語とランタイムをサポートしています。このドキュメント ページでは、JavaScript と Node.js の例に焦点を当てます。ただし、コンセプトは他のサポート対象言語とランタイムに直接変換できます。
  • BigQuery
    • BigQuery を選択する理由このドキュメント ページでは、すでに BigQuery を使用していることを前提としていますが、BigQuery は一般的にデータ ウェアハウスとして最適な選択肢です。次の点にご注意ください。
      • BigQuery Storage Write API: BigQuery には、SQL ベースのジョブのデータ操作言語(DML)ステートメントなど、データ ウェアハウス内のデータを更新するための複数のインターフェースが用意されています。ただし、大量の書き込みには BigQuery Storage Write API が最適です。
      • 更新ではなく追加: このソリューションでは、行の更新ではなく追加のみが行われますが、追加専用ログからクエリ時に「現在の状態」テーブルを常に導出できるため、更新をシミュレートできます。
  • サポートしているサービス
    • Secret Manager: Secret Manager は、シークレット値が関数の構成に直接保存されるなど、アクセスしやすい場所に保存されないようにします。
    • Identity and Access Management(IAM): IAM は、実行時に必要なシークレットにアクセスし、目的の BigQuery テーブルに書き込む権限を関数に付与します。
    • Cloud Build: このページでは Cloud Build について詳しく説明しませんが、Cloud Run functions はバックグラウンドで Cloud Build を使用します。また、Cloud Build を使用して、Git リポジトリのソースコードの変更から関数への継続的デプロイの更新を自動化することもできます。
  • アクションとユーザー認証
    • Cloud Run サービス アカウント: 組織のファーストパーティ アセットとリソースとの統合に Looker アクションを使用する最も簡単で主な方法は、Looker Action API のトークンベースの認証メカニズムを使用して Looker インスタンスからのリクエストとして認証し、サービス アカウントを使用して BigQuery のデータを更新する関数を承認することです。
    • OAuth: このページでは説明しませんが、Looker Action API の OAuth 機能を使用することもできます。このアプローチはより複雑で、通常は必要ありませんが、Looker でのアクセス権や関数コード内のアドホック ロジックを使用するのではなく、IAM を使用してテーブルへの書き込みに対するエンドユーザーのアクセス権を定義する必要がある場合は使用できます。

デモコードのチュートリアル

デモ アクションのロジック全体を含む単一のファイルが GitHub で入手可能です。このセクションでは、コードの主な要素について説明します。

セットアップ コード

最初のセクションには、アクションの書き込み先となるテーブルを識別するデモ定数がいくつかあります。このページの後半のデプロイガイドのセクションでは、プロジェクト ID をご自身の ID に置き換えるよう指示されます。これがコードに必要な唯一の変更です。

/*** Demo constants */
const projectId = "your-project-id"
const datasetId = "demo_dataset"
const tableId = "demo_table"

次のセクションでは、アクションで使用するコードの依存関係を宣言して初期化します。Secret Manager Node.js モジュールを使用して「コード内」で Secret Manager にアクセスする例を示しますが、Cloud Run functions の組み込み機能を使用して、初期化中にシークレットを取得することで、このコード依存関係を排除することもできます。

/*** Code Dependencies ***/
const crypto = require("crypto")
const {SecretManagerServiceClient} = require('@google-cloud/secret-manager')
const secrets = new SecretManagerServiceClient()
const BigqueryStorage = require('@google-cloud/bigquery-storage')
const BQSManagedWriter = BigqueryStorage.managedwriter

参照される @google-cloud 依存関係は、依存関係をプリロードして Node.js ランタイムで使用できるように、package.json ファイルでも宣言されています。crypto は組み込みの Node.js モジュールであり、package.json で宣言されていません。

HTTP リクエストの処理とルーティング

コードが Cloud Run functions ランタイムに公開するメイン インターフェースは、Node.js Express ウェブサーバーの規約に従うエクスポートされた JavaScript 関数です。特に、関数は 2 つの引数を受け取ります。1 つ目は HTTP リクエストを表し、そこからさまざまなリクエスト パラメータと値を読み取ることができます。2 つ目はレスポンス オブジェクトを表し、そこにレスポンス データを発行します。関数の名前は任意ですが、デプロイガイド セクションで説明するように、後で Cloud Run functions に名前を指定する必要があります。

/*** Entry-point for requests ***/
exports.httpHandler = async function httpHandler(req,res) {

httpHandler 関数の最初のセクションでは、アクションが認識するさまざまなルートを宣言します。これは、単一のアクションに必要な Action API エンドポイントと、ファイル内で後で定義される各ルートを処理する関数を反映したものです。

一部のアクションと Cloud Run functions の例では、Cloud Run functions のデフォルトのルーティングと 1 対 1 で対応するように、このようなルートごとに個別の関数をデプロイしますが、関数は、ここで示すように、コード内で追加の「サブルーティング」を適用できます。これは最終的には好みの問題ですが、コード内でこの追加のルーティングを行うことで、デプロイする必要がある関数の数を最小限に抑え、すべてのアクションのエンドポイントで一貫性のある単一のコード状態を維持できます。

    const routes = {
        "/": [hubListing],
        "/status": [hubStatus], // Debugging endpoint. Not required.
        "/action-0/form": [
            requireInstanceAuth,
            action0Form
            ], 
        "/action-0/execute": [
            requireInstanceAuth,
            processRequestBody,
            action0Execute
            ]
        }

HTTP ハンドラ関数の残りの部分では、上記のルート宣言に対する HTTP リクエストの処理を実装し、これらのハンドラからの戻り値をレスポンス オブジェクトに接続します。

    try {
        const routeHandlerSequence = routes[req.path] || [routeNotFound]
        for(let handler of routeHandlerSequence) {
            let handlerResponse = await handler(req)
            if (!handlerResponse) continue 
            return res
                .status(handlerResponse.status || 200)
                .json(handlerResponse.body || handlerResponse)
            }
        }
    catch(err) {
        console.error(err)
        res.status(500).json("Unhandled error. See logs for details.")
        }
    }

HTTP ハンドラとルートの宣言が終わったので、実装する必要がある 3 つのメイン アクション エンドポイントについて詳しく見ていきましょう。

アクション リスト エンドポイント

Looker 管理者が Looker インスタンスをアクション サーバーに初めて接続すると、Looker は「アクション リスト エンドポイント」と呼ばれる指定された URL を呼び出して、サーバーで使用可能なアクションに関する情報を取得します。

前に示したルート宣言では、このエンドポイントを関数の URL のルートパス(/)で使用できるようにし、hubListing 関数で処理されることを示しました。

次の関数定義からわかるように、コードはそれほど多くありません。毎回同じ JSON データを返すだけです。注意点として、この関数は「独自の」URL を一部のフィールドに動的に含めるため、Looker インスタンスは後で同じ関数にリクエストを送信できます。

async function hubListing(req){
    return {
        integrations: [
            {
                name: "demo-bq-insert",
                label: "Demo BigQuery Insert",
                supported_action_types: ["cell", "query", "dashboard"],
                form_url:`${process.env.CALLBACK_URL_PREFIX}/action-0/form`,
                url: `${process.env.CALLBACK_URL_PREFIX}/action-0/execute`,
                icon_data_uri: "data:image/png;base64,...",
                supported_formats:["inline_json"],
                supported_formattings:["unformatted"],
                required_fields:[
                    // You can use this to make your action available
                    // for specific queries/fields
                    // {tag:"user_id"}
                    ],
                params: [
                    // You can use this to require parameters, either
                    // from the Action's administrative configuration,
                    // or from the invoking user's user attributes. 
                    // A common use case might be to have the Looker
                    // instance pass along the user's identification to
                    // allow you to conditionally authorize the action:
                    {name: "email", label: "Email", user_attribute_name: "email", required: true}
                    ]
                }
            ]
        }
    }

デモ用に、このリスティングを取得する際に認証を必要としないコードを使用しています。ただし、アクション メタデータが機密情報であると判断した場合は、次のセクションに示すように、このルートの認証を必須にすることもできます。

また、Cloud Run functions 関数は複数のアクションを公開して処理できるため、/action-X/... のルート規約が使用されています。ただし、デモの Cloud Run functions では 1 つのアクションのみを実装します。

アクション フォームのエンドポイント

すべてのユースケースでフォームが必要になるわけではありませんが、データベースのライトバックのユースケースにはフォームが適しています。ユーザーは Looker でデータを検査し、データベースに挿入する値を指定できるためです。アクション リストで form_url パラメータが提供されているため、ユーザーがアクションの操作を開始すると、Looker はこの アクション フォーム エンドポイントを呼び出して、ユーザーからキャプチャする追加データを決定します。

ルート宣言では、このエンドポイントを /action-0/form パスで使用できるようにし、requireInstanceAuthaction0Form の 2 つのハンドラを関連付けました。

複数のハンドラを許可するようにルート宣言を設定したのは、一部のロジックを複数のエンドポイントで再利用できるようにするためです。

たとえば、requireInstanceAuth が複数のルートで使用されていることがわかります。このハンドラは、リクエストが Looker インスタンスから送信されたものであることを確認する必要がある場合に使用します。ハンドラは、Secret Manager から想定されるシークレット トークン値を取得し、その想定されるトークン値がないリクエストを拒否します。

async function requireInstanceAuth(req) {
    const lookerSecret = await getLookerSecret()
    if(!lookerSecret){return}
    const expectedAuthHeader = `Token token="${lookerSecret}"`
    if(!timingSafeEqual(req.headers.authorization,expectedAuthHeader)){
        return {
            status:401,
            body: {error: "Looker instance authentication is required"}
            }
        }
    return

    function timingSafeEqual(a, b) {
        if(typeof a !== "string"){return}
        if(typeof b !== "string"){return}
        var aLen = Buffer.byteLength(a)
        var bLen = Buffer.byteLength(b)
        const bufA = Buffer.allocUnsafe(aLen)
        bufA.write(a)
        const bufB = Buffer.allocUnsafe(aLen) //Yes, aLen
        bufB.write(b)

        return crypto.timingSafeEqual(bufA, bufB) && aLen === bLen;
        }
    }

標準の等価性チェック(==)ではなく timingSafeEqual 実装を使用しているのは、攻撃者が秘密の値を見つけ出すのに役立つサイドチャネル タイミング情報の漏洩を防ぐためです。

リクエストがインスタンス認証チェックに合格すると、リクエストは action0Form ハンドラによって処理されます。

async function action0Form(req){
    return [
        {name: "choice",  label: "Choose", type:"select", options:[
            {name:"Yes", label:"Yes"},
            {name:"No", label:"No"},
            {name:"Maybe", label:"Maybe"}
            ]},
        {name: "note", label: "Note", type: "textarea"}
        ]
    }

デモの例は非常に静的ですが、特定のユースケースではフォームコードをよりインタラクティブにすることができます。たとえば、最初のプルダウンでユーザーが選択した内容に応じて、異なるフィールドを表示できます。

アクションの実行エンドポイント

アクション実行エンドポイントは、アクションのロジックの大部分が存在する場所であり、BigQuery 挿入ユースケースに固有のロジックを記述する場所でもあります。

ルート宣言では、このエンドポイントを /action-0/execute パスで使用できるようにし、3 つのハンドラ(requireInstanceAuthprocessRequestBodyaction0Execute)を関連付けました。

requireInstanceAuth についてはすでに説明しました。processRequestBody ハンドラは、Looker のリクエスト本文の特定の不便なフィールドをより便利な形式にするための、ほとんど面白みのない前処理を提供しますが、完全なコードファイルで参照できます。

action0Execute 関数は、アクション リクエストの複数の部分から有用な情報を抽出する例を示すことから始まります。実際には、コードで formParamsactionParams として参照されるリクエスト要素には、リスティング エンドポイントとフォーム エンドポイントで宣言した内容に応じて、さまざまなフィールドが含まれる可能性があります。

async function action0Execute (req){
    try{
        // Prepare some data that we will insert
        const scheduledPlanId = req.body.scheduled_plan && req.body.scheduled_plan.scheduled_plan_id
        const formParams = req.body.form_params || {}
        const actionParams = req.body.data || {}
        const queryData = req.body.attachment.data //If using a standard "push" action

        /*In case any fields require datatype-specific preparation, check this example:
        https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/append_rows_proto2.js
        */

        const newRow = {
            invoked_at: new Date(),
            invoked_by: actionParams.email,
            scheduled_plan_id: scheduledPlanId || null,
            query_result_size: queryData.length,
            choice: formParams.choice,
            note: formParams.note,
            }

その後、コードは標準の BigQuery コードに移行し、実際にデータを挿入します。BigQuery Storage Write API には、永続的なストリーミング接続や多くのレコードの一括挿入に適した、より複雑なバリエーションもあります。ただし、Cloud Run 関数のコンテキストで個々のユーザー操作に応答する場合は、これが最も直接的なバリエーションです。

await bigqueryConnectAndAppend(newRow)

...

async function bigqueryConnectAndAppend(row){   
    let writerClient
    try{
        const destinationTablePath = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`
        const streamId = `${destinationTablePath}/streams/_default`
        writerClient = new BQSManagedWriter.WriterClient({projectId})
        const writeMetadata = await writerClient.getWriteStream({
            streamId,
            view: 'FULL',
            })
        const protoDescriptor = BigqueryStorage.adapt.convertStorageSchemaToProto2Descriptor(
            writeMetadata.tableSchema,
            'root'
            )
        const connection = await writerClient.createStreamConnection({
            streamId,
            destinationTablePath,
            })
        const writer = new BQSManagedWriter.JSONWriter({
            streamId,
            connection,
            protoDescriptor,
            })

        let result
        if(row){
            // The API expects an array of rows, so wrap the single row in an array
            const rowsToAppend = [row]
            result = await writer.appendRows(rowsToAppend).getResult()
            }
        return {
            streamId: connection.getStreamId(),
            protoDescriptor,
            result
            }
        }
    catch (e) {throw e}
    finally{
        if(writerClient){writerClient.close()}
        }
    }

デモコードには、トラブルシューティング用の「ステータス」エンドポイントも含まれていますが、このエンドポイントは Action API の統合には必要ありません。

デプロイガイド

最後に、前提条件、Cloud Run functions のデプロイ、BigQuery の構成、Looker の構成など、デモを自分でデプロイするための手順ガイドを提供します。

プロジェクトとサービスの前提条件

具体的な構成を開始する前に、このリストを確認して、ソリューションに必要なサービスとポリシーを把握してください。

  1. 新しいプロジェクト: この例のリソースを格納する新しいプロジェクトが必要です。
  2. サービス: Cloud コンソール UI で BigQuery と Cloud Run functions を初めて使用するときに、BigQuery、Artifact Registry、Cloud Build、Cloud Functions、Cloud Logging、Pub/Sub、Cloud Run Admin、Secret Manager などの必要なサービスに必要な API を有効にするよう求められます。
  3. 未認証の呼び出しのポリシー: このユースケースでは、IAM ではなく Action API に従ってコードで受信リクエストの認証を処理するため、「未認証の呼び出しを許可する」Cloud Run 関数をデプロイする必要があります。デフォルトでは許可されていますが、組織のポリシーでこの使用が制限されていることがよくあります。具体的には、constraints/iam.allowedPolicyMemberDomains ポリシーは IAM 権限を付与できるユーザーを制限します。未認証アクセスに allUsers プリンシパルを許可するには、このポリシーの調整が必要になることがあります。未認証の呼び出しを許可できない場合は、こちらのガイド(ドメインで制限された共有の適用時にパブリック Cloud Run サービスを作成する方法)をご覧ください。
  4. 他のポリシー: 他の Google Cloud 組織のポリシーの制約によって、デフォルトで許可されているサービスがデプロイされない場合もあります。

Cloud Run 関数のデプロイ

新しいプロジェクトを作成したら、次の手順で Cloud Run 関数をデプロイします。

  1. [Cloud Run functions] で、[関数を作成] をクリックします。
  2. 関数の名前は任意です(例: demo-bq-insert-action)。
  3. [トリガー] 設定で次の操作を行います。
    1. トリガー タイプはすでに「HTTPS」になっているはずです。
    2. [認証] を [未認証の呼び出しを許可] に設定します。
    3. [URL] の値をクリップボードにコピーします。
  4. [ランタイム] > [ランタイム環境変数] の設定で、次の操作を行います。
    1. [変数を追加] をクリックします。
    2. 変数名を CALLBACK_URL_PREFIX に設定します。
    3. 前の手順の URL を値として貼り付けます。
  5. [次へ] をクリックします。
  6. package.json ファイルをクリックし、コンテンツを貼り付けます。
  7. index.js ファイルをクリックし、コンテンツを貼り付けます。
  8. ファイルの上部にある projectId 変数を実際のプロジェクト ID に割り当てます。
  9. [エントリ ポイント] を httpHandler に設定します。
  10. [デプロイ] をクリックします。
  11. ビルド サービス アカウントにリクエストされた権限(ある場合)を付与します。
  12. デプロイが完了するまで待ちます。
  13. 以降の手順で、 Google Cloud ログを確認するように指示するエラーが表示された場合は、このページの [ログ] タブからこの関数のログにアクセスできます。
  14. Cloud Run functions のページから移動する前に、[詳細] タブで、関数が持つサービス アカウントを見つけてメモします。これは、後のステップで関数に必要な権限があることを確認するために使用します。
  15. URL にアクセスして、ブラウザで関数デプロイを直接テストします。統合リスティングを含む JSON レスポンスが表示されます。
  16. 403 エラーが表示された場合は、組織のポリシーにより、未認証の呼び出しを許可する試みが失敗した可能性があります。関数で未認証の呼び出しが許可されているかどうかを確認し、組織のポリシー設定を確認して、設定の更新を試みます。

BigQuery の宛先テーブルへのアクセス

実際には、挿入先のテーブルは別の Google Cloud プロジェクトに存在できますが、デモンストレーションのため、同じプロジェクトに新しい宛先テーブルを作成します。いずれの場合も、Cloud Run 関数のサービス アカウントにテーブルへの書き込み権限があることを確認する必要があります。

  1. BigQuery コンソールに移動します。
  2. デモテーブルを作成します。

    1. エクスプローラ バーで、プロジェクトの横にある省略記号メニューを使用して、[データセットを作成] を選択します。
    2. データセットに ID demo_dataset を指定し、[データセットを作成] をクリックします。
    3. 新しく作成したデータセットの省略記号メニューを使用して、[テーブルを作成] を選択します。
    4. テーブルに demo_table という名前を付けます。
    5. [スキーマ] で [テキストとして編集] を選択し、次のスキーマを使用して [テーブルを作成] をクリックします。

      [
       {"name":"invoked_at","type":"TIMESTAMP"},
       {"name":"invoked_by","type":"STRING"},
       {"name":"scheduled_plan_id","type":"STRING"},
       {"name":"query_result_size","type":"INTEGER"},
       {"name":"choice","type":"STRING"},
       {"name":"note","type":"STRING"}
      ]
      
  3. 権限の割り当て:

    1. [エクスプローラ] バーで、データセットをクリックします。
    2. [データセット] ページで、[共有 > 権限] をクリックします。
    3. [プリンシパルを追加] をクリックします。
    4. [新しいプリンシパル] を、このページの先ほどメモした関数のサービス アカウントに設定します。
    5. BigQuery データ編集者ロールを割り当てます。
    6. [保存] をクリックします。

Looker に接続する

関数がデプロイされたので、Looker を関数に接続します。

  1. リクエストが Looker インスタンスから送信されたことを認証するには、アクションの共有シークレットが必要です。長いランダムな文字列を生成して安全に保管します。これは、以降のステップで Looker シークレット値として使用します。
  2. Cloud コンソールで、[Secret Manager] に移動します。
    1. [シークレットの作成] をクリックします。
    2. [名前] を LOOKER_SECRET に設定します。(これはこのデモのコードにハードコードされていますが、独自のコードを使用する場合は、任意の名前を効果的に選択できます)。
    3. [シークレット値] を、生成したシークレット値に設定します。
    4. [シークレットの作成] をクリックします。
    5. [シークレット] ページで、[権限] タブをクリックします。
    6. [アクセス権を付与] をクリックします。
    7. [新しいプリンシパル] を、前にメモした関数のサービス アカウントに設定します。
    8. Secret Manager のシークレット アクセサーのロールを割り当てます。
    9. [保存] をクリックします。
    10. 関数 URL に追加された /status ルートにアクセスして、関数がシークレットに正常にアクセスしていることを確認できます。
  3. Looker インスタンスで次の操作を行います。
    1. [管理] > [プラットフォーム] > [アクション] に移動します。
    2. ページの下部にある [アクション ハブを追加] をクリックします。
    3. 関数の URL(例: https://your-region-your-project.cloudfunctions.net/demo-bq-insert-action)を指定し、[アクション ハブを追加] をクリックして確定します。
    4. Demo BigQuery Insert という名前のアクションが 1 つ含まれる新しいアクション ハブ エントリが表示されます。
    5. アクション ハブ エントリで、[Configure Authorization] をクリックします。
    6. 生成された Looker シークレットを [Authorization Token] フィールドに入力し、[Update Token] をクリックします。
    7. [Demo BigQuery Insert] アクションで、[有効にする] をクリックします。
    8. [有効] スイッチをオンに切り替えます。
    9. アクションのテストが自動的に実行され、関数が Looker のリクエストを受け入れ、フォーム エンドポイントに正しく応答していることが確認されます。
    10. [保存] をクリックします。

エンドツーエンド テスト

これで、新しいアクションを実際に使用できるようになりました。このアクションは任意のクエリで動作するように構成されているため、任意の Explore(組み込みのシステム アクティビティ Explore など)を選択し、新しいクエリにフィールドを追加して実行し、歯車メニューから [送信] を選択します。アクションが利用可能なデスティネーションの 1 つとして表示され、いくつかのフィールド入力が求められます。

新しいアクションが選択された Looker の [送信] モーダルのスクリーンショット

[送信] を押すと、BigQuery テーブルに新しい行が挿入され、Looker ユーザー アカウントのメールアドレスが invoked_by 列に表示されます。