Cloud Run functions の関数で Looker アクションを使用して BigQuery に書き戻す

Looker の多くのお客様は、データ ウェアハウス内のデータのレポート作成だけでなく、そのデータ ウェアハウスへの書き込みと更新をユーザーが行えるようにしたいと考えています。

Looker は、Action API を通じて、任意のデータウェアハウスまたは宛先でこのユースケースをサポートしています。このドキュメント ページでは、 Google Cloud インフラストラクチャを使用するお客様が、Cloud Run 関数にソリューションをデプロイして BigQuery に書き戻す方法について説明します。このページでは、次のトピックについて説明します。

ソリューションの考慮事項

次の考慮事項を参照して、このソリューションがニーズに合っていることを確認してください。

  • Cloud Run 関数
    • Cloud Run functions を選ぶ理由Google のサーバーレス サービスである Cloud Run 関数は、運用とメンテナンスの容易さから、優れた選択肢です。考慮すべき点として、特にコールド呼び出しの場合、レイテンシが専用サーバーに依存するソリューションよりも長くなる可能性があることが挙げられます。
    • 言語とランタイム Cloud Run functions は、複数の言語とランタイムをサポートしています。このドキュメント ページでは、JavaScript と Node.js の例について説明します。ただし、これらのコンセプトは、サポートされている他の言語とランタイムに直接変換できます。
  • BigQuery
    • BigQuery を選択する理由このドキュメント ページでは、BigQuery をすでに使用していることを前提としていますが、BigQuery は一般的なデータ ウェアハウスに適しています。次の点にご注意ください。
      • BigQuery Storage Write API: BigQuery には、SQL ベースのジョブのデータ操作言語(DML)ステートメントなど、データ ウェアハウス内のデータの更新に使用できる複数のインターフェースが用意されています。ただし、大量の書き込みには BigQuery Storage Write API が最適です。
      • 更新ではなく追加: このソリューションでは行を更新するのではなく追加するだけですが、追加専用ログからクエリ時に「現在の状態」テーブルをいつでも導出できるため、更新をシミュレートできます。
  • サポート サービス
    • Secret Manager: Secret Manager は、シークレット値を保持し、関数の構成に直接保存するなど、アクセスが容易な場所に保存されないようにします。
    • Identity and Access Management(IAM): IAM は、関数に必要なシークレットへの実行時のアクセスと、目的の BigQuery テーブルへの書き込みを許可します。
    • Cloud Build: このページでは Cloud Build について詳しく説明しませんが、Cloud Run functions はバックグラウンドで Cloud Build を使用します。Cloud Build を使用すると、Git リポジトリのソースコードの変更から関数への継続的デプロイ アップデートを自動化できます。
  • アクションとユーザー認証
    • Cloud Run サービス アカウント Looker アクションを使用して組織独自のファーストパーティ アセットとリソースと統合する主な方法は、Looker Action API のトークンベースの認証メカニズムを使用して、Looker インスタンスから送信されたリクエストを認証し、サービス アカウントを使用して BigQuery のデータを更新する関数を承認することです。
    • OAuth: このページでは説明していませんが、Looker Action API の OAuth 機能を使用する方法もあります。このアプローチは複雑で、通常は必要ありませんが、Looker または関数コード内のアドホック ロジックでエンドユーザーのアクセス権を使用するのではなく、IAM を使用してテーブルへの書き込みを行うエンドユーザーのアクセス権を定義する必要がある場合に使用できます。

デモコードのチュートリアル

デモ アクションのロジック全体を含む単一のファイルは、GitHub で入手できます。このセクションでは、コードの主要な要素について説明します。

セットアップ コード

最初のセクションには、アクションが書き込むテーブルを識別するデモ定数がいくつかあります。このページの下部にあるデプロイガイドでは、プロジェクト ID を独自の ID に置き換える手順について説明します。これは、コードに必要な唯一の変更です。

/*** Demo constants */
const projectId = "your-project-id"
const datasetId = "demo_dataset"
const tableId = "demo_table"

次のセクションでは、アクションで使用するコード依存関係を宣言して初期化します。Secret Manager Node.js モジュールを使用して Secret Manager に「コード内で」アクセスする例を示します。ただし、Cloud Run 関数の組み込み機能を使用して初期化時にシークレットを取得することで、このコード依存関係を排除することもできます。

/*** Code Dependencies ***/
const crypto = require("crypto")
const {SecretManagerServiceClient} = require('@google-cloud/secret-manager')
const secrets = new SecretManagerServiceClient()
const BigqueryStorage = require('@google-cloud/bigquery-storage')
const BQSManagedWriter = BigqueryStorage.managedwriter

参照される @google-cloud 依存関係は、依存関係をプリロードして Node.js ランタイムで使用できるように、package.json ファイルでも宣言されています。crypto は組み込みの Node.js モジュールであり、package.json で宣言されていません。

HTTP リクエストの処理とルーティング

コードが Cloud Run 関数ランタイムに公開するメイン インターフェースは、Node.js Express ウェブサーバーのコンベンションに従うエクスポートされた JavaScript 関数です。特に、関数は 2 つの引数を受け取ります。1 つ目は HTTP リクエストを表し、さまざまなリクエスト パラメータと値を読み取ることができます。2 つ目はレスポンス オブジェクトを表し、レスポンス データを出力します。関数の名前は任意ですが、デプロイガイドのセクションで説明するように、後で Cloud Run functions に名前を指定する必要があります。

/*** Entry-point for requests ***/
exports.httpHandler = async function httpHandler(req,res) {

httpHandler 関数の最初のセクションでは、アクションが認識するさまざまなルートを宣言します。これは、単一のアクションに必要な Action API エンドポイントと、各ルートを処理する関数(ファイルの後半で定義)を忠実に反映しています。

一部のアクションと Cloud Run functions の例では、このようなルートごとに個別の関数をデプロイして、Cloud Run functions のデフォルトのルーティングと 1 対 1 で対応させていますが、関数は、ここで示すように、コード内で追加の「サブルーティング」を適用できます。これは最終的には好みの問題ですが、コード内でこの追加のルーティングを行うと、デプロイする関数の数が最小限に抑えられ、すべてのアクションのエンドポイントで単一の一貫したコード状態を維持できます。

    const routes = {
        "/": [hubListing],
        "/status": [hubStatus], // Debugging endpoint. Not required.
        "/action-0/form": [
            requireInstanceAuth,
            action0Form
            ], 
        "/action-0/execute": [
            requireInstanceAuth,
            processRequestBody,
            action0Execute
            ]
        }

HTTP ハンドラ関数の残りの部分は、上記のルート宣言に対する HTTP リクエストの処理を実装し、それらのハンドラからの戻り値をレスポンス オブジェクトに接続します。

    try {
        const routeHandlerSequence = routes[req.path] || [routeNotFound]
        for(let handler of routeHandlerSequence) {
            let handlerResponse = await handler(req)
            if (!handlerResponse) continue 
            return res
                .status(handlerResponse.status || 200)
                .json(handlerResponse.body || handlerResponse)
            }
        }
    catch(err) {
        console.error(err)
        res.status(500).json("Unhandled error. See logs for details.")
        }
    }

HTTP ハンドラとルートの宣言が完了したので、実装する必要がある 3 つの主要なアクション エンドポイントについて詳しく説明します。

アクション リスト エンドポイント

Looker 管理者が Looker インスタンスをアクション サーバーに初めて接続すると、Looker は指定された URL(「アクション リスト エンドポイント」)を呼び出して、サーバーで利用可能なアクションに関する情報を取得します。

前述のルート宣言では、このエンドポイントを関数の URL のルートパス(/)で利用できるようにし、hubListing 関数によって処理されることを指定しました。

次の関数定義からわかるように、この関数には「コード」がほとんどありません。毎回同じ JSON データを返すだけです。注意点として、一部のフィールドに「独自の」URL が動的に含まれるため、Looker インスタンスは後続のリクエストを同じ関数に送り返すことができます。

async function hubListing(req){
    return {
        integrations: [
            {
                name: "demo-bq-insert",
                label: "Demo BigQuery Insert",
                supported_action_types: ["cell", "query", "dashboard"],
                form_url:`${process.env.CALLBACK_URL_PREFIX}/action-0/form`,
                url: `${process.env.CALLBACK_URL_PREFIX}/action-0/execute`,
                icon_data_uri: "data:image/png;base64,...",
                supported_formats:["inline_json"],
                supported_formattings:["unformatted"],
                required_fields:[
                    // You can use this to make your action available
                    // for specific queries/fields
                    // {tag:"user_id"}
                    ],
                params: [
                    // You can use this to require parameters, either
                    // from the Action's administrative configuration,
                    // or from the invoking user's user attributes. 
                    // A common use case might be to have the Looker
                    // instance pass along the user's identification to
                    // allow you to conditionally authorize the action:
                    {name: "email", label: "Email", user_attribute_name: "email", required: true}
                    ]
                }
            ]
        }
    }

デモ用に、このリスティングを取得するために認証を必要としないコードになっています。ただし、アクション メタデータが機密情報であると思われる場合は、次のセクションで説明するように、このルートの認証を必須にすることもできます。

また、Cloud Run 関数は複数のアクションを公開して処理できるため、/action-X/... のルート規則が使用されています。ただし、このデモの Cloud Run 関数では、1 つのアクションのみを実装します。

アクション フォーム エンドポイント

すべてのユースケースでフォームが必要になるわけではありませんが、データベースへの書き戻しのユースケースにはフォームが適しています。ユーザーは Looker でデータを検査し、データベースに挿入する値を指定できます。アクション リストで form_url パラメータが指定されているため、ユーザーがアクションの操作を開始すると、Looker はこのアクション フォーム エンドポイントを呼び出して、ユーザーから収集する追加データを決定します。

ルート宣言で、このエンドポイントを /action-0/form パスで利用できるようにし、requireInstanceAuthaction0Form の 2 つのハンドラを関連付けました。

一部のロジックは複数のエンドポイントで再利用できるため、このような複数のハンドラを許可するようにルート宣言を設定しています。

たとえば、requireInstanceAuth が複数のルートで使用されていることがわかります。このハンドラは、リクエストが Looker インスタンスから送信されたことを確認する必要がある場合に使用します。ハンドラは Secret Manager からシークレットの期待されるトークン値を取得し、その期待されるトークン値が含まれていないリクエストを拒否します。

async function requireInstanceAuth(req) {
    const lookerSecret = await getLookerSecret()
    if(!lookerSecret){return}
    const expectedAuthHeader = `Token token="${lookerSecret}"`
    if(!timingSafeEqual(req.headers.authorization,expectedAuthHeader)){
        return {
            status:401,
            body: {error: "Looker instance authentication is required"}
            }
        }
    return

    function timingSafeEqual(a, b) {
        if(typeof a !== "string"){return}
        if(typeof b !== "string"){return}
        var aLen = Buffer.byteLength(a)
        var bLen = Buffer.byteLength(b)
        const bufA = Buffer.allocUnsafe(aLen)
        bufA.write(a)
        const bufB = Buffer.allocUnsafe(aLen) //Yes, aLen
        bufB.write(b)

        return crypto.timingSafeEqual(bufA, bufB) && aLen === bLen;
        }
    }

標準の等価性チェック(==)ではなく timingSafeEqual 実装を使用していることに注意してください。これは、攻撃者がシークレットの値をすばやく見つけることができるサイドチャネルのタイミング情報を漏洩させないためです。

リクエストがインスタンス認証チェックに合格すると、リクエストは action0Form ハンドラによって処理されます。

async function action0Form(req){
    return [
        {name: "choice",  label: "Choose", type:"select", options:[
            {name:"Yes", label:"Yes"},
            {name:"No", label:"No"},
            {name:"Maybe", label:"Maybe"}
            ]},
        {name: "note", label: "Note", type: "textarea"}
        ]
    }

このデモの例は非常に静的ですが、特定のユースケースではフォーム コードをよりインタラクティブにすることができます。たとえば、最初のプルダウンでユーザーが選択した内容に応じて、表示されるフィールドが異なる場合があります。

アクション実行エンドポイント

アクション実行エンドポイントには、アクションのロジックの大部分が存在します。ここでは、BigQuery 挿入ユースケースに固有のロジックについて説明します。

ルート宣言で、このエンドポイントを /action-0/execute パスで利用できるようにし、requireInstanceAuthprocessRequestBodyaction0Execute の 3 つのハンドラを関連付けました。

requireInstanceAuth についてはすでに説明しました。processRequestBody ハンドラは、Looker のリクエスト本文内の特定の不便なフィールドをより便利な形式に変換するための、ほとんど興味のない前処理を提供します。このハンドラについては、完全なコードファイルで確認できます。

action0Execute 関数は、アクション リクエストのいくつかの部分から有用な情報を抽出する例を示します。実際には、コードで formParamsactionParams と参照しているリクエスト要素には、リスティング エンドポイントとフォーム エンドポイントで宣言した内容に応じて異なるフィールドが含まれる場合があります。

async function action0Execute (req){
    try{
        // Prepare some data that we will insert
        const scheduledPlanId = req.body.scheduled_plan && req.body.scheduled_plan.scheduled_plan_id
        const formParams = req.body.form_params || {}
        const actionParams = req.body.data || {}
        const queryData = req.body.attachment.data //If using a standard "push" action

        /*In case any fields require datatype-specific preparation, check this example:
        https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/append_rows_proto2.js
        */

        const newRow = {
            invoked_at: new Date(),
            invoked_by: actionParams.email,
            scheduled_plan_id: scheduledPlanId || null,
            query_result_size: queryData.length,
            choice: formParams.choice,
            note: formParams.note,
            }

その後、コードは標準の BigQuery コードに移行し、データを実際に挿入します。BigQuery Storage Write API には、永続的なストリーミング接続や多数のレコードの一括挿入に適した、より複雑なバリエーションも用意されていますが、Cloud Run 関数のコンテキストで個々のユーザー操作に応答する場合は、これが最も直接的なバリエーションです。

await bigqueryConnectAndAppend(newRow)

...

async function bigqueryConnectAndAppend(row){   
    let writerClient
    try{
        const destinationTablePath = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`
        const streamId = `${destinationTablePath}/streams/_default`
        writerClient = new BQSManagedWriter.WriterClient({projectId})
        const writeMetadata = await writerClient.getWriteStream({
            streamId,
            view: 'FULL',
            })
        const protoDescriptor = BigqueryStorage.adapt.convertStorageSchemaToProto2Descriptor(
            writeMetadata.tableSchema,
            'root'
            )
        const connection = await writerClient.createStreamConnection({
            streamId,
            destinationTablePath,
            })
        const writer = new BQSManagedWriter.JSONWriter({
            streamId,
            connection,
            protoDescriptor,
            })

        let result
        if(row){
            // The API expects an array of rows, so wrap the single row in an array
            const rowsToAppend = [row]
            result = await writer.appendRows(rowsToAppend).getResult()
            }
        return {
            streamId: connection.getStreamId(),
            protoDescriptor,
            result
            }
        }
    catch (e) {throw e}
    finally{
        if(writerClient){writerClient.close()}
        }
    }

デモコードには、トラブルシューティング用の「status」エンドポイントも含まれていますが、このエンドポイントは Action API の統合には必要ありません。

導入ガイド

最後に、デモを自分でデプロイするための手順ガイドを紹介します。前提条件、Cloud Run 関数のデプロイ、BigQuery の構成、Looker の構成について説明します。

プロジェクトとサービスの前提条件

具体的な構成を開始する前に、このリストでソリューションに必要なサービスとポリシーを確認してください。

  1. 新しいプロジェクト: サンプルのリソースを格納する新しいプロジェクトが必要です。
  2. サービス: Cloud コンソール UI で BigQuery 関数と Cloud Run 関数を初めて使用するときに、BigQuery、Artifact Registry、Cloud Build、Cloud Functions、Cloud Logging、Pub/Sub、Cloud Run Admin、Secret Manager などの必要なサービスに必要な API を有効にするように求められます。
  3. 未認証の呼び出しに関するポリシー: このユースケースでは、IAM を使用するのではなく、Action API に従ってコード内の受信リクエストの認証を処理するため、「未認証の呼び出しを許可する」Cloud Run functions をデプロイする必要があります。これはデフォルトで許可されていますが、組織のポリシーでこの使用が制限されていることがよくあります。具体的には、constraints/iam.allowedPolicyMemberDomains ポリシーは IAM 権限を付与できるユーザーを制限します。allUsers プリンシパルに未認証のアクセスを許可するように調整する必要があります。未認証の呼び出しを許可できない場合は、ドメインで制限された共有の適用時にパブリック Cloud Run サービスを作成する方法をご覧ください。
  4. その他のポリシー: 他の Google Cloud 組織のポリシーの制約によって、デフォルトで許可されているサービスのデプロイがブロックされる場合もあります。

Cloud Run 関数のデプロイ

新しいプロジェクトを作成したら、次の手順で Cloud Run 関数をデプロイします。

  1. [Cloud Run functions] で、[関数を作成] をクリックします。
  2. 関数の名前を選択します(例:「demo-bq-insert-action」)。
  3. [トリガー] の設定で、次の操作を行います。
    1. トリガーのタイプはすでに「HTTPS」になっているはずです。
    2. [認証] を [未認証の呼び出しを許可] に設定します。
    3. [URL] の値をクリップボードにコピーします。
  4. [Runtime > Runtime environment variables] で、次の設定を行います。
    1. [変数を追加] をクリックします。
    2. 変数名を CALLBACK_URL_PREFIX に設定します。
    3. 前の手順の URL を値として貼り付けます。
  5. [次へ] をクリックします。
  6. package.json ファイルをクリックし、内容を貼り付けます。
  7. index.js ファイルをクリックし、内容を貼り付けます。
  8. ファイルの上部にある projectId 変数を独自のプロジェクト ID に割り当てます。
  9. [エントリ ポイント] を httpHandler に設定します。
  10. [デプロイ] をクリックします。
  11. リクエストされた権限(ある場合)をビルド サービス アカウントに付与します。
  12. デプロイが完了するまで待ちます。
  13. 以降の手順で、ログを確認するよう求めるエラーが表示された場合は、このページの [ログ] タブからこの関数のログにアクセスできます。 Google Cloud
  14. Cloud Run 関数のページから移動する前に、[詳細] タブで、関数に割り当てられているサービス アカウントを見つけてメモします。後で、関数に必要な権限があることを確認するために使用します。
  15. URL にアクセスして、ブラウザで直接関数のデプロイをテストします。統合リスティングを含む JSON レスポンスが表示されます。
  16. 403 エラーが発生した場合は、組織のポリシーが原因で [未認証の呼び出しを許可する] の設定が失敗している可能性があります。関数が未認証の呼び出しを許可しているかどうかを確認し、組織のポリシー設定を確認して、設定を更新してみてください。

BigQuery の宛先テーブルへのアクセス

実際には、挿入先の宛先テーブルは別のプロジェクトに配置できますが、デモ用に同じプロジェクトに新しい宛先テーブルを作成します。 Google Cloud どちらの場合も、Cloud Run 関数のサービス アカウントにテーブルへの書き込み権限があることを確認する必要があります。

  1. BigQuery コンソールに移動します。
  2. デモテーブルを作成します。

    1. エクスプローラ バーで、プロジェクトの横にある省略記号メニューを使用して、[データセットを作成] を選択します。
    2. データセットに ID demo_dataset を指定して、[データセットを作成] をクリックします。
    3. 新しく作成したデータセットの省略記号メニューを使用して、[テーブルを作成] を選択します。
    4. テーブルに demo_table という名前を付けます。
    5. [スキーマ] で [テキストとして編集] を選択し、次のスキーマを使用して [テーブルを作成] をクリックします。

      [
       {"name":"invoked_at","type":"TIMESTAMP"},
       {"name":"invoked_by","type":"STRING"},
       {"name":"scheduled_plan_id","type":"STRING"},
       {"name":"query_result_size","type":"INTEGER"},
       {"name":"choice","type":"STRING"},
       {"name":"note","type":"STRING"}
      ]
      
  3. 権限の割り当て:

    1. [エクスプローラ] バーで、データセットをクリックします。
    2. [データセット] ページで、[共有] > [権限] をクリックします。
    3. [プリンシパルを追加] をクリックします。
    4. [新しいプリンシパル] を、このページで前述の関数の Service Account に設定します。
    5. BigQuery データ編集者のロールを割り当てます。
    6. [保存] をクリックします。

Looker に接続する

関数がデプロイされたので、Looker を関数に接続します。

  1. アクションが Looker インスタンスからのリクエストであることを認証するために、共有シークレットが必要です。長いランダムな文字列を生成し、安全に保管します。この値は、後続のステップで Looker シークレットの値として使用します。
  2. Cloud コンソールで、[Secret Manager] に移動します。
    1. [シークレットの作成] をクリックします。
    2. [名前] を LOOKER_SECRET に設定します。(このデモではコードにハードコードされていますが、独自のコードを使用する場合は任意の名前を選択できます)。
    3. [Secret Value] を、生成したシークレット値に設定します。
    4. [シークレットの作成] をクリックします。
    5. [シークレット] ページで、[権限] タブをクリックします。
    6. [アクセス権を付与] をクリックします。
    7. [新しいプリンシパル] を、前にメモした関数のサービス アカウントに設定します。
    8. Secret Manager のシークレット アクセサーのロールを割り当てます。
    9. [保存] をクリックします。
    10. 関数がシークレットに正常にアクセスしていることを確認するには、関数の URL に追加された /status ルートにアクセスします。
  3. Looker インスタンスで、次の操作を行います。
    1. [管理] > [プラットフォーム] > [アクション] に移動します。
    2. ページの下部にある [アクション ハブを追加] をクリックします。
    3. 関数の URL(例: https://your-region-your-project.cloudfunctions.net/demo-bq-insert-action)を指定し、[アクション ハブを追加] をクリックして確認します。
    4. 新しいアクション ハブ エントリが作成され、[Demo BigQuery Insert] という名前のアクションが 1 つ表示されます。
    5. アクション ハブのエントリで、[承認を構成] をクリックします。
    6. 生成された Looker Secret を [認証トークン] フィールドに入力し、[トークンを更新] をクリックします。
    7. [Demo BigQuery Insert] アクションで、[有効にする] をクリックします。
    8. [有効] スイッチをオンに切り替えます。
    9. アクションのテストが自動的に実行され、関数が Looker のリクエストを受け入れ、フォーム エンドポイントに正しく応答していることを確認します。
    10. [保存] をクリックします。

エンドツーエンド テスト

これで、新しいアクションを実際に使用できるようになります。このアクションは任意のクエリで動作するように構成されているため、任意の Explore(組み込みのシステム アクティビティ Explore など)を選択し、新しいクエリにフィールドを追加して実行し、歯車メニューから [送信] を選択します。アクションが利用可能なデスティネーションの 1 つとして表示され、フィールドの入力を求めるメッセージが表示されます。

新しいアクションが選択された Looker の [送信] モーダルのスクリーンショット

[送信] をクリックすると、BigQuery テーブルに新しい行が挿入され(invoked_by 列に Looker ユーザー アカウントのメールアドレスが識別されます)、