使用 Cloud Run 函数中的 Looker 操作进行 BigQuery 回写

许多 Looker 客户希望让用户不仅能够生成数据仓库中数据的报告,还能实际写回并更新该数据仓库。

通过其 Action API,Looker 支持针对任何数据仓库或目标位置实现此用例。本文档页面将引导使用 Google Cloud 基础架构的客户在 Cloud Run 函数上部署解决方案,以便写回 BigQuery。本页将介绍以下主题:

解决方案注意事项

请参考以下注意事项列表,验证此解决方案是否符合您的需求。

  • Cloud Run 函数
    • 为何选择 Cloud Run 函数?作为 Google 的“无服务器”产品,Cloud Run 函数非常适合实现轻松的运维和维护。请注意,与依赖于专用服务器的解决方案相比,延迟时间(尤其是冷启动调用)可能会更长。
    • 语言和运行时:Cloud Run 函数支持多种语言和运行时。本文档页面将重点介绍 JavaScript 和 Node.js 中的示例。不过,这些概念可以直接转换为其他受支持的语言和运行时。
  • BigQuery
    • 为何选择 BigQuery?虽然本文档页面假定您已在使用 BigQuery,但一般来说,BigQuery 是数据仓库的绝佳选择。请注意以下注意事项:
      • BigQuery Storage Write API:BigQuery 提供了多个用于更新数据仓库中数据的接口,例如基于 SQL 的作业中的数据操纵语言 (DML) 语句。不过,对于大批量写入,最佳选项是 BigQuery Storage Write API
      • 附加而非更新:即使此解决方案只会附加行,而不会更新行,您也可以随时在查询时从只附加日志派生“当前状态”表,从而模拟更新。
  • 支持服务
    • Secret ManagerSecret Manager 会保留 Secret 值,以确保这些值不会存储在过于公开的位置(例如直接存储在函数的配置中)。
    • Identity and Access Management (IAM)IAM 授权函数在运行时访问必要的 Secret,并写入预期的 BigQuery 表。
    • Cloud Build:虽然本页不会详细介绍 Cloud Build,但 Cloud Run 函数会在后台使用它。您可以使用 Cloud Build 自动根据 Git 代码库中的源代码更改持续部署函数更新。
  • 操作和用户身份验证
    • Cloud Run 服务账号:使用 Looker 操作与贵组织自己的第一方素材资源和资源集成的主要方法也是最简单的方法,就是使用 Looker Action API 基于令牌的身份验证机制对请求进行身份验证(确认请求来自您的 Looker 实例),然后授权该函数使用服务账号更新 BigQuery 中的数据。
    • OAuth:本页未介绍的另一种方法是使用 Looker Action API 的 OAuth 功能。这种方法更复杂,通常不需要使用,但如果您需要使用 IAM 定义最终用户对表的写入权限,而不是在 Looker 中使用他们的权限或在函数代码中使用临时逻辑,则可以使用此方法。

演示代码演示

我们有一个文件包含演示操作的完整逻辑,可在 GitHub 上找到。在本部分中,我们将详细介绍该代码的关键元素。

设置代码

第一个部分包含一些演示常量,用于标识操作将写入的表。在本页下文的部署指南部分中,您将会看到将项目 ID 替换为您自己的说明,这是对代码进行的唯一必要修改。

/*** Demo constants */
const projectId = "your-project-id"
const datasetId = "demo_dataset"
const tableId = "demo_table"

下一部分声明并初始化了您的操作将使用的几个代码依赖项。我们提供了一个示例,该示例使用 Secret Manager Node.js 模块“在代码中”访问 Secret Manager;不过,您也可以使用 Cloud Run 函数的内置功能在初始化期间为您检索密钥,从而消除此代码依赖项。

/*** Code Dependencies ***/
const crypto = require("crypto")
const {SecretManagerServiceClient} = require('@google-cloud/secret-manager')
const secrets = new SecretManagerServiceClient()
const BigqueryStorage = require('@google-cloud/bigquery-storage')
const BQSManagedWriter = BigqueryStorage.managedwriter

请注意,我们还在 package.json 文件中声明了引用的 @google-cloud 依赖项,以便预加载这些依赖项并将其提供给 Node.js 运行时。crypto 是一个内置的 Node.js 模块,未在 package.json 中声明。

HTTP 请求处理和路由

您的代码向 Cloud Run 函数运行时公开的主要接口是一个遵循 Node.js Express Web 服务器惯例的导出 JavaScript 函数。具体而言,您的函数会收到两个参数:第一个参数表示 HTTP 请求,您可以从中读取各种请求参数和值;第二个参数表示响应对象,您可以向其发出响应数据。虽然函数的名称可以是您想要的任何名称,但您稍后必须向 Cloud Run functions 提供该名称,如部署指南部分所详述。

/*** Entry-point for requests ***/
exports.httpHandler = async function httpHandler(req,res) {

httpHandler 函数的第一部分声明了我们的操作将识别的各种路由,这些路由与单个操作的 Action API 所需的端点非常相似,并且该部分还声明了将处理每个路由的函数(该函数在文件的后面部分定义)。

虽然某些操作 + Cloud Run 函数示例会为每条此类路由部署单独的函数,以便与 Cloud Run 函数的默认路由一对一对应,但函数能够在其代码中应用额外的“子路由”,如这里所示。这最终取决于个人偏好,但在代码中执行此额外路由可最大限度地减少我们必须部署的函数数量,并有助于我们在所有操作的端点中维护单一一致的代码状态。

    const routes = {
        "/": [hubListing],
        "/status": [hubStatus], // Debugging endpoint. Not required.
        "/action-0/form": [
            requireInstanceAuth,
            action0Form
            ], 
        "/action-0/execute": [
            requireInstanceAuth,
            processRequestBody,
            action0Execute
            ]
        }

HTTP 处理程序函数的其余部分会根据上述路由声明实现 HTTP 请求的处理,并将这些处理程序的返回值连接到响应对象。

    try {
        const routeHandlerSequence = routes[req.path] || [routeNotFound]
        for(let handler of routeHandlerSequence) {
            let handlerResponse = await handler(req)
            if (!handlerResponse) continue 
            return res
                .status(handlerResponse.status || 200)
                .json(handlerResponse.body || handlerResponse)
            }
        }
    catch(err) {
        console.error(err)
        res.status(500).json("Unhandled error. See logs for details.")
        }
    }

完成 HTTP 处理程序和路线声明后,我们将深入探讨必须实现的三个主要操作端点:

操作列表端点

当 Looker 管理员首次将 Looker 实例连接到 Action 服务器时,Looker 会调用所提供的网址(称为“操作列表端点”),以获取有关通过该服务器可执行的操作的信息。

在之前显示的路线声明中,我们在函数网址下的根路径 (/) 中提供了此端点,并指明它将由 hubListing 函数处理。

从以下函数定义可以看出,它没有太多的“代码”,只会每次返回相同的 JSON 数据。需要注意的是,它会将自己的网址动态添加到某些字段中,以便 Looker 实例将后续请求发回到同一函数。

async function hubListing(req){
    return {
        integrations: [
            {
                name: "demo-bq-insert",
                label: "Demo BigQuery Insert",
                supported_action_types: ["cell", "query", "dashboard"],
                form_url:`${process.env.CALLBACK_URL_PREFIX}/action-0/form`,
                url: `${process.env.CALLBACK_URL_PREFIX}/action-0/execute`,
                icon_data_uri: "data:image/png;base64,...",
                supported_formats:["inline_json"],
                supported_formattings:["unformatted"],
                required_fields:[
                    // You can use this to make your action available
                    // for specific queries/fields
                    // {tag:"user_id"}
                    ],
                params: [
                    // You can use this to require parameters, either
                    // from the Action's administrative configuration,
                    // or from the invoking user's user attributes. 
                    // A common use case might be to have the Looker
                    // instance pass along the user's identification to
                    // allow you to conditionally authorize the action:
                    {name: "email", label: "Email", user_attribute_name: "email", required: true}
                    ]
                }
            ]
        }
    }

出于演示目的,我们的代码无需身份验证即可检索此商家信息。不过,如果您认为操作元数据属于敏感信息,也可以要求对此路线进行身份验证,如下一部分所示。

另请注意,我们的 Cloud Run 函数可以公开和处理多个操作,这说明了我们采用 /action-X/... 的路由惯例。不过,我们的 Cloud Run 函数演示只会实现一项操作。

操作表单端点

虽然并非所有用例都需要表单,但表单非常适合数据库回写用例,因为用户可以在 Looker 中检查数据,然后提供要插入数据库的值。由于我们的 Action 列表提供了 form_url 参数,因此当用户开始与您的 Action 互动时,Looker 会调用此 Action 表单端点,以确定要从用户捕获哪些其他数据。

在路由声明中,我们在 /action-0/form 路径下提供了此端点,并与两个处理程序(requireInstanceAuthaction0Form)相关联。

我们设置路由声明是为了允许有多个这样的处理脚本,因为某些逻辑可以用于多个端点。

例如,我们可以看到 requireInstanceAuth 用于多个路线。每当我们需要要求请求必须来自我们的 Looker 实例时,都会使用此处理脚本。该处理脚本会从 Secret Manager 检索预期的 Secret 令牌值,并拒绝不具有该预期令牌值的任何请求。

async function requireInstanceAuth(req) {
    const lookerSecret = await getLookerSecret()
    if(!lookerSecret){return}
    const expectedAuthHeader = `Token token="${lookerSecret}"`
    if(!timingSafeEqual(req.headers.authorization,expectedAuthHeader)){
        return {
            status:401,
            body: {error: "Looker instance authentication is required"}
            }
        }
    return

    function timingSafeEqual(a, b) {
        if(typeof a !== "string"){return}
        if(typeof b !== "string"){return}
        var aLen = Buffer.byteLength(a)
        var bLen = Buffer.byteLength(b)
        const bufA = Buffer.allocUnsafe(aLen)
        bufA.write(a)
        const bufB = Buffer.allocUnsafe(aLen) //Yes, aLen
        bufB.write(b)

        return crypto.timingSafeEqual(bufA, bufB) && aLen === bLen;
        }
    }

请注意,我们使用的是 timingSafeEqual 实现,而不是标准的等式检查 (==),以防止泄露旁道时间信息,否则攻击者可能会快速找出密钥的值。

假设请求通过了实例身份验证检查,则该请求将由 action0Form 处理程序处理。

async function action0Form(req){
    return [
        {name: "choice",  label: "Choose", type:"select", options:[
            {name:"Yes", label:"Yes"},
            {name:"No", label:"No"},
            {name:"Maybe", label:"Maybe"}
            ]},
        {name: "note", label: "Note", type: "textarea"}
        ]
    }

虽然我们的演示示例非常静态,但对于某些用例,表单代码可以更加交互。例如,系统可能会根据用户在初始下拉菜单中进行的选择,显示不同的字段。

操作执行端点

Action Execute 端点是任何操作的大部分逻辑所在的位置,我们将在此处介绍特定于 BigQuery 插入用例的逻辑。

在路由声明中,我们在 /action-0/execute 路径下提供了此端点,并与三个处理程序相关联:requireInstanceAuthprocessRequestBodyaction0Execute

我们已经介绍了 requireInstanceAuthprocessRequestBody 处理程序提供的预处理操作大多不太重要,只是为了将 Looker 请求正文中某些不方便使用的字段转换为更方便的格式,但您可以在完整代码文件中参阅它。

action0Execute 函数首先会显示从操作请求的多个部分提取可能有用信息的示例。在实践中,请注意,我们的代码中称为 formParamsactionParams 的请求元素可能包含不同的字段,具体取决于您在商家信息和表单端点中声明的内容。

async function action0Execute (req){
    try{
        // Prepare some data that we will insert
        const scheduledPlanId = req.body.scheduled_plan && req.body.scheduled_plan.scheduled_plan_id
        const formParams = req.body.form_params || {}
        const actionParams = req.body.data || {}
        const queryData = req.body.attachment.data //If using a standard "push" action

        /*In case any fields require datatype-specific preparation, check this example:
        https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/append_rows_proto2.js
        */

        const newRow = {
            invoked_at: new Date(),
            invoked_by: actionParams.email,
            scheduled_plan_id: scheduledPlanId || null,
            query_result_size: queryData.length,
            choice: formParams.choice,
            note: formParams.note,
            }

然后,代码会转换为一些标准 BigQuery 代码,以实际插入数据。请注意,BigQuery Storage Write API 还提供其他更复杂的变体,更适合持久流式连接或大量插入许多记录;但对于在 Cloud Run 函数上下文中响应个别用户互动,这是最直接的变体。

await bigqueryConnectAndAppend(newRow)

...

async function bigqueryConnectAndAppend(row){   
    let writerClient
    try{
        const destinationTablePath = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`
        const streamId = `${destinationTablePath}/streams/_default`
        writerClient = new BQSManagedWriter.WriterClient({projectId})
        const writeMetadata = await writerClient.getWriteStream({
            streamId,
            view: 'FULL',
            })
        const protoDescriptor = BigqueryStorage.adapt.convertStorageSchemaToProto2Descriptor(
            writeMetadata.tableSchema,
            'root'
            )
        const connection = await writerClient.createStreamConnection({
            streamId,
            destinationTablePath,
            })
        const writer = new BQSManagedWriter.JSONWriter({
            streamId,
            connection,
            protoDescriptor,
            })

        let result
        if(row){
            // The API expects an array of rows, so wrap the single row in an array
            const rowsToAppend = [row]
            result = await writer.appendRows(rowsToAppend).getResult()
            }
        return {
            streamId: connection.getStreamId(),
            protoDescriptor,
            result
            }
        }
    catch (e) {throw e}
    finally{
        if(writerClient){writerClient.close()}
        }
    }

演示版代码还包含一个“状态”端点,用于进行问题排查,但 Action API 集成不需要此端点。

部署指南

最后,我们将提供分步指南,指导您自行部署演示,其中涵盖前提条件、Cloud Run 函数部署、BigQuery 配置和 Looker 配置。

项目和服务前提条件

在开始配置任何具体设置之前,请查看此列表,了解该解决方案需要哪些服务和政策:

  1. 一个新项目:您需要一个新项目来存放本例中的资源。
  2. 服务:首次在 Cloud 控制台界面中使用 BigQuery 和 Cloud Run 函数时,系统会提示您为必要服务启用所需的 API,包括 BigQuery、Artifact Registry、Cloud Build、Cloud Functions、Cloud Logging、Pub/Sub、Cloud Run Admin 和 Secret Manager。
  3. 针对未经身份验证的调用的政策:由于我们将根据 Action API(而非使用 IAM)在代码中处理传入请求的身份验证,因此此用例要求我们部署“允许未经身份验证的调用”的 Cloud Run 函数。虽然默认情况下允许这样做,但组织政策通常会限制这种使用方式。具体而言,constraints/iam.allowedPolicyMemberDomains 政策会限制可以授予 IAM 权限的用户,您可能需要调整该政策,以允许 allUsers 主账号获得未经身份验证的访问权限。如果您发现自己无法允许未经身份验证的调用,请参阅本指南中的如何在实施网域限定共享时创建公共 Cloud Run 服务部分,了解详情。
  4. 其他政策:请注意,其他 Google Cloud 组织政策限制条件也可能会阻止部署默认情况下允许的服务。

部署 Cloud Run 函数

创建新项目后,请按以下步骤部署 Cloud Run 函数

  1. Cloud Run functions 中,点击 Create Function(创建函数)。
  2. 为您的函数选择任意名称(例如“demo-bq-insert-action”)。
  3. 触发器设置下:
    1. 触发器类型应已设为“HTTPS”。
    2. 身份验证设置为允许未经身份验证的调用
    3. 网址值复制到剪贴板。
  4. Runtime > Runtime environment variables 设置下:
    1. 点击添加变量
    2. 将变量名称设置为 CALLBACK_URL_PREFIX
    3. 将上一步中的网址粘贴为值。
  5. 点击下一步
  6. 点击 package.json 文件,然后粘贴内容
  7. 点击 index.js 文件,然后粘贴内容
  8. 将文件顶部的 projectId 变量分配给您自己的项目 ID。
  9. 入口点设置为 httpHandler
  10. 点击部署
  11. 向 build 服务账号授予请求的权限(如果有)。
  12. 等待部署完成。
  13. 如果您在后续步骤中收到指示您查看 Google Cloud 日志的错误,请注意,您可以通过此页面上的日志标签页访问此函数的日志。
  14. 在离开 Cloud Run 函数页面之前,请在详细信息标签页下找到并记下该函数所具有的服务账号。我们将在后续步骤中使用此 ID,以确保该函数具有所需的权限。
  15. 访问网址,直接在浏览器中测试函数部署。您应该会看到包含集成列表的 JSON 响应。
  16. 如果您收到 403 错误,则可能是因为组织政策导致您尝试设置允许未经身份验证的调用时静默失败。检查您的函数是否允许未经身份验证的调用,查看您的组织政策设置,然后尝试更新该设置。

对 BigQuery 目标表的访问权限

在实践中,要插入的目标表可以位于其他 Google Cloud 项目中;但出于演示目的,我们将在同一项目中创建新的目标表。无论是哪种情况,您都需要确保 Cloud Run 函数的服务账号有权向表中写入数据。

  1. 前往 BigQuery 控制台
  2. 创建演示表:

    1. 在“探索器”栏中,使用项目旁边的省略号菜单,然后选择创建数据集
    2. 为数据集指定 ID demo_dataset,然后点击创建数据集
    3. 使用新创建的数据集上的省略号菜单,然后选择创建表
    4. 为表命名为 demo_table
    5. Schema 下,选择 Edit as text,使用以下架构,然后点击 Create table

      [
       {"name":"invoked_at","type":"TIMESTAMP"},
       {"name":"invoked_by","type":"STRING"},
       {"name":"scheduled_plan_id","type":"STRING"},
       {"name":"query_result_size","type":"INTEGER"},
       {"name":"choice","type":"STRING"},
       {"name":"note","type":"STRING"}
      ]
      
  3. 分配权限:

    1. 探索器栏中,点击您的数据集。
    2. 数据集页面上,依次点击共享 > 权限
    3. 点击添加主账号
    4. 新主账号设置为您函数的服务账号(本页面上方所述)。
    5. 分配 BigQuery Data Editor 角色。
    6. 点击保存

连接到 Looker

现在,您的函数已部署完毕,我们将将 Looker 与其关联。

  1. 我们需要您的操作的共享密钥,以验证请求是否来自您的 Looker 实例。生成一个长随机字符串,并确保其安全。我们将在后续步骤中将其用作 Looker 密钥值。
  2. 在 Cloud 控制台中,前往 Secret Manager
    1. 点击创建 Secret
    2. 名称设置为 LOOKER_SECRET。(此值在此演示的代码中是硬编码的,但在使用自己的代码时,您可以选择任何名称。)
    3. 密钥值设置为您生成的密钥值。
    4. 点击创建 Secret
    5. 密钥页面上,点击权限标签页。
    6. 点击授予访问权限
    7. 新的主账号设置为您之前记下的函数的服务账号。
    8. 分配 Secret Manager Secret Accessor 角色。
    9. 点击保存
    10. 您可以通过访问附加到函数网址的 /status 路由来确认您的函数是否已成功访问 Secret。
  3. 在 Looker 实例中:
    1. 依次前往“管理”>“平台”>“操作”。
    2. 前往页面底部,点击添加 Action Hub
    3. 提供函数的网址(例如 https://your-region-your-project.cloudfunctions.net/demo-bq-insert-action),然后点击 Add Action Hub 进行确认。
    4. 现在,您应该会看到一个新的 Action Hub 条目,其中包含一个名为 Demo BigQuery Insert 的操作。
    5. 在“Action Hub”条目上,点击配置授权
    6. 将您生成的 Looker Secret 输入 Authorization Token 字段,然后点击 Update Token(更新令牌)。
    7. Demo BigQuery Insert 操作中,点击 Enable(启用)。
    8. 启用开关切换为开启。
    9. 系统应自动运行对该操作的测试,以确认您的函数是否正在接受 Looker 的请求并正确响应表单端点。
    10. 点击保存

端到端测试

现在,我们应该能够实际使用新操作了。此操作已配置为适用于任何查询,因此请选择任何探索(例如内置的“系统活动”探索),向新查询添加一些字段,运行该查询,然后从齿轮菜单中选择发送。您应该会看到该操作作为可用目的地之一,并且系统会提示您输入一些字段:

屏幕截图:Looker 的“发送”模态,其中选择了我们的新操作

Send 后,系统应会将新行插入您的 BigQuery 表中(并在 invoked_by 列中标识 Looker 用户账号的电子邮件地址)!