使用远程函数

通过 BigQuery 远程函数,您可以使用 SQL 和 JavaScript 以外的语言或 BigQuery 用户定义的函数中不允许的库或服务来实现函数。

概览

BigQuery 远程函数能够与 Cloud Run functionsCloud Run 直接集成,使您可以将 GoogleSQL 的功能与 BigQuery 之外的软件结合使用。借助 BigQuery 远程函数,您可以在使用任何支持的语言实现的 Cloud Run functions 或 Cloud Run 中部署您的函数,然后从 GoogleSQL 查询调用这些函数。

工作流

  1. 在 Cloud Functions 或 Cloud Run 中创建 HTTP 端点。
  2. 在 BigQuery 中创建远程函数。
    1. 创建类型为 CLOUD_RESOURCE 的连接。
    2. 创建一个远程函数。
  3. 与任何其他用户定义的函数一样,在查询中使用该远程函数。

限制

  • 远程函数仅支持以下数据类型之一作为参数类型或返回值类型:

    • 布尔值
    • 字节
    • 数字
    • 字符串
    • 日期
    • 日期时间
    • 时间
    • 时间戳
    • JSON

    远程函数不支持 ARRAYSTRUCTINTERVALGEOGRAPHY 类型。

  • 您无法创建临时远程函数。

  • 您无法创建表值型远程函数。

  • 创建具体化视图时不能使用远程函数。

  • 远程函数的返回值始终假定为具有非确定性,因此系统不会缓存调用远程函数的查询的结果。

  • 由于暂时性网络错误或 BigQuery 内部错误,您可能会看到向端点发送相同数据的重复请求,即使在收到成功响应后也会出现这种情况。

  • 当由于短路而跳过某些行的远程函数评估时,例如,在条件表达式或带有 WHEN [NOT] MATCHEDMERGE 语句中,批处理不会与远程函数一起使用。在这种情况下,HTTP 请求正文中的 calls 字段有且仅有一个元素。

  • 如果通过跨区域数据集复制将与远程函数关联的数据集复制到目标区域,则只能在远程函数的创建区域中查询该远程函数。

创建端点

如需创建可实现业务逻辑的远程函数,您必须使用 Cloud Run Functions 或 Cloud Run 创建 HTTP 端点。该端点必须能够在单个 HTTP POST 请求中处理一批行,并将该批处理的结果作为 HTTP 响应返回。

如果您使用 BigQuery DataFrame 创建远程函数,则无需手动创建 HTTP 端点;该服务会自动为您执行此操作。

如需了解如何编写、部署、测试和维护 Cloud Run functions 函数,请参阅 Cloud Run functions 教程及其他 Cloud Run functions 文档

请参阅 Cloud Run 快速入门和其他 Cloud Run 文档,了解如何编写、部署、测试和维护 Cloud Run 服务。

建议您保留默认身份验证,而不是允许对 Cloud Run 函数或 Cloud Run 服务进行未经身份验证的调用。

输入格式

BigQuery 会发送包含 JSON 正文的 HTTP POST 请求,格式如下:

字段名称 说明 字段类型
requestId 请求的 ID。在 GoogleSQL 查询中,该 ID 在发送到此端点的多个请求中必须唯一。 始终提供。字符串。
调用方 调用远程函数的 GoogleSQL 查询的作业完整资源名称。 始终提供。字符串。
sessionUser 执行 GoogleSQL 查询的用户的电子邮件地址。 始终提供。字符串。
userDefinedContext 在 BigQuery 中创建远程函数时使用的用户定义上下文。 可选。具有键值对的 JSON 对象。
个调用 一批输入数据。 始终提供。JSON 数组。

每个元素本身都是一个 JSON 数组,即一个远程函数调用的 JSON 编码参数列表。

请求的示例:

{
 "requestId": "124ab1c",
 "caller": "//bigquery.googleapis.com/projects/myproject/jobs/myproject:US.bquxjob_5b4c112c_17961fafeaf",
 "sessionUser": "test-user@test-company.com",
 "userDefinedContext": {
  "key1": "value1",
  "key2": "v2"
 },
 "calls": [
  [null, 1, "", "abc"],
  ["abc", "9007199254740993", null, null]
 ]
}

输出格式

BigQuery 要求端点按以下格式返回 HTTP 响应,否则 BigQuery 将无法使用该端点,并且调用远程函数的查询将会失败。

字段名称 说明 值范围
replies 一批返回值。 如需成功响应,此字段是必需的。JSON 数组。

每个元素对应于外部函数的 JSON 编码返回值。

该数组的大小必须与 HTTP 请求中 calls 的 JSON 数组的大小相匹配。例如,如果 calls 中的 JSON 数组有 4 个元素,则此 JSON 数组也需要包含 4 个元素。

errorMessage 返回除 200 之外的 HTTP 响应代码时显示的错误消息。对于不可重试的错误,我们会将其作为 BigQuery 作业错误消息的一部分返回给用户。 可选。字符串。大小应小于 1KB。

成功响应的示例:

{
  "replies": [
    1,
    0
  ]
}

失败响应的示例:

{
  "errorMessage": "Received but not expected that the argument 0 be null".
}

HTTP 响应代码

您的端点应返回 HTTP 响应代码 200 以表示成功响应。BigQuery 收到任何其他值时,会将响应视为失败,并在 HTTP 响应代码为 408、429、500、503 或 504 时重试,直到出现一些内部限制为止。

SQL 数据类型的 JSON 编码

HTTP 请求/响应中的 JSON 编码遵循适用于 TO_JSON_STRING 函数现有 BigQuery JSON 编码

Cloud Run 函数代码示例

以下 Python 代码示例会添加远程函数的所有整数参数。它使用批量调用的参数来处理请求,并在响应中返回所有结果。

import functions_framework

from flask import jsonify

# Max INT64 value encoded as a number in JSON by TO_JSON_STRING. Larger values are encoded as
# strings.
# See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_encodings
_MAX_LOSSLESS=9007199254740992

@functions_framework.http
def batch_add(request):
  try:
    return_value = []
    request_json = request.get_json()
    calls = request_json['calls']
    for call in calls:
      return_value.append(sum([int(x) if isinstance(x, str) else x for x in call if x is not None]))
    replies = [str(x) if x > _MAX_LOSSLESS or x < -_MAX_LOSSLESS else x for x in return_value]
    return_json = jsonify( { "replies":  replies } )
    return return_json
  except Exception as e:
    return jsonify( { "errorMessage": str(e) } ), 400

假设函数在位于 us-east1 区域的 my_gcf_project 项目中以函数名称 remote_add 进行部署,则可以通过端点 https://us-east1-my_gcf_project.cloudfunctions.net/remote_add 访问该函数。

Cloud Run 代码示例

以下 Python 代码示例实现了一项 Web 服务,您可以构建该服务并部署到 Cloud Run 以实现相同的功能。

import os

from flask import Flask, request, jsonify

# Max INT64 value encoded as a number in JSON by TO_JSON_STRING. Larger values are encoded as
# strings.
# See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_encodings
_MAX_LOSSLESS=9007199254740992

app = Flask(__name__)

@app.route("/", methods=['POST'])
def batch_add():
  try:
    return_value = []
    request_json = request.get_json()
    calls = request_json['calls']
    for call in calls:
      return_value.append(sum([int(x) if isinstance(x, str) else x for x in call if x is not None]))
    replies = [str(x) if x > _MAX_LOSSLESS or x < -_MAX_LOSSLESS else x for x in return_value]
    return jsonify( { "replies" :  replies } )
  except Exception as e:
    return jsonify( { "errorMessage": str(e) } ), 400

if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

如需了解如何构建和部署代码,请参阅指南

假设 Cloud Run 服务已部署在项目 my_gcf_project 和区域 us-east1 中,服务名称为 remote_add,可以通过端点 https://remote_add-<project_id_hash>-ue.a.run.app 来访问它。

创建远程函数

BigQuery 使用 CLOUD_RESOURCE 连接与 Cloud Run 函数进行交互。如需创建远程函数,您必须创建 CLOUD_RESOURCE 连接。如果您使用 BigQuery DataFrame 创建远程函数,并且您已被授予 Project IAM Admin (roles/resourcemanager.projectIamAdmin) 角色,则无需手动创建连接和授予访问权限;该服务会自动为您执行此操作。

创建连接

您必须具有 Cloud 资源连接才能连接到 Cloud Run 函数和 Cloud Run。

从下列选项中选择一项:

控制台

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 如需创建连接,请点击 添加,然后点击与外部数据源的连接

  3. 连接类型列表中,选择 Vertex AI 远程模型、远程函数和 BigLake(Cloud 资源)

  4. 连接 ID 字段中,输入连接的名称。

  5. 点击创建连接

  6. 点击转到连接

  7. 连接信息窗格中,复制服务账号 ID 以在后续步骤中使用。

bq

  1. 在命令行环境中,创建连接:

    bq mk --connection --location=REGION --project_id=PROJECT_ID \
        --connection_type=CLOUD_RESOURCE CONNECTION_ID

    --project_id 参数会替换默认项目。

    替换以下内容:

    • REGION:您的连接区域
    • PROJECT_ID:您的 Google Cloud 项目 ID
    • CONNECTION_ID:您的连接的 ID

    当您创建连接资源时,BigQuery 会创建一个唯一的系统服务账号,并将其与该连接相关联。

    问题排查:如果您收到以下连接错误,请更新 Google Cloud SDK

    Flags parsing error: flag --connection_type=CLOUD_RESOURCE: value should be one of...
    
  2. 检索并复制服务账号 ID 以在后续步骤中使用:

    bq show --connection PROJECT_ID.REGION.CONNECTION_ID

    输出类似于以下内容:

    name                          properties
    1234.REGION.CONNECTION_ID     {"serviceAccountId": "connection-1234-9u56h9@gcp-sa-bigquery-condel.iam.gserviceaccount.com"}
    

Terraform

使用 google_bigquery_connection 资源。

如需向 BigQuery 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为客户端库设置身份验证

以下示例会在 US 区域中创建一个名为 my_cloud_resource_connection 的 Cloud 资源连接:


# This queries the provider for project information.
data "google_project" "default" {}

# This creates a cloud resource connection in the US region named my_cloud_resource_connection.
# Note: The cloud resource nested object has only one output field - serviceAccountId.
resource "google_bigquery_connection" "default" {
  connection_id = "my_cloud_resource_connection"
  project       = data.google_project.default.project_id
  location      = "US"
  cloud_resource {}
}

如需在 Google Cloud 项目中应用 Terraform 配置,请完成以下部分中的步骤。

准备 Cloud Shell

  1. 启动 Cloud Shell
  2. 设置要在其中应用 Terraform 配置的默认 Google Cloud 项目。

    您只需为每个项目运行一次以下命令,即可在任何目录中运行它。

    export GOOGLE_CLOUD_PROJECT=PROJECT_ID

    如果您在 Terraform 配置文件中设置显式值,则环境变量会被替换。

准备目录

每个 Terraform 配置文件都必须有自己的目录(也称为“根模块”)。

  1. Cloud Shell 中,创建一个目录,并在该目录中创建一个新文件。文件名必须具有 .tf 扩展名,例如 main.tf。在本教程中,该文件称为 main.tf
    mkdir DIRECTORY && cd DIRECTORY && touch main.tf
  2. 如果您按照教程进行操作,可以在每个部分或步骤中复制示例代码。

    将示例代码复制到新创建的 main.tf 中。

    (可选)从 GitHub 中复制代码。如果端到端解决方案包含 Terraform 代码段,则建议这样做。

  3. 查看和修改要应用到您的环境的示例参数。
  4. 保存更改。
  5. 初始化 Terraform。您只需为每个目录执行一次此操作。
    terraform init

    (可选)如需使用最新的 Google 提供程序版本,请添加 -upgrade 选项:

    terraform init -upgrade

应用更改

  1. 查看配置并验证 Terraform 将创建或更新的资源是否符合您的预期:
    terraform plan

    根据需要更正配置。

  2. 通过运行以下命令并在提示符处输入 yes 来应用 Terraform 配置:
    terraform apply

    等待 Terraform 显示“应用完成!”消息。

  3. 打开您的 Google Cloud 项目以查看结果。在 Google Cloud 控制台的界面中找到资源,以确保 Terraform 已创建或更新它们。

设置访问权限

您必须向新连接授予对 Cloud Run 函数或 Cloud Run 服务的只读权限。建议不允许对 Cloud Run 函数或 Cloud Run 服务进行未经身份验证的调用。

要授予角色,请按以下步骤操作:

  1. 前往 IAM 和管理页面。

    转到“IAM 和管理”

  2. 点击 Add

    系统随即会打开添加主账号对话框。

  3. 新的主账号字段中,输入您之前复制的服务账号 ID。

  4. 选择角色字段中,选择以下选项之一:

    • 如果您使用的是第 1 代 Cloud Run 函数,请选择 Cloud Function,然后选择 Cloud Function Invoker 角色
    • 如果您使用的是第 2 代 Cloud Run 函数,请选择 Cloud Run,然后选择 Cloud Run Invoker 角色
    • 如果您使用的是 Cloud Run 服务,请选择 Cloud Run,然后选择 Cloud Run Invoker 角色
  5. 点击保存

创建远程函数

如需创建远程函数,请执行以下操作:

SQL

在 BigQuery 中运行以下 CREATE FUNCTION 语句:

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,输入以下语句:

    CREATE FUNCTION PROJECT_ID.DATASET_ID.remote_add(x INT64, y INT64) RETURNS INT64
    REMOTE WITH CONNECTION PROJECT_ID.LOCATION.CONNECTION_NAME
    OPTIONS (
      endpoint = 'ENDPOINT_URL'
    )

    请替换以下内容:

    • DATASET_ID:BigQuery 数据集的 ID。
    • ENDPOINT_URL:Cloud Run 函数或 Cloud Run 远程函数端点的网址。

  3. 点击 运行

如需详细了解如何运行查询,请参阅运行交互式查询

BigQuery DataFrame

  1. 启用所需的 API 并确保您拥有所需的角色,如远程函数要求部分所述。
  2. 使用 remote_function 修饰器

    import bigframes.pandas as bpd
    
    # Set BigQuery DataFrames options
    bpd.options.bigquery.project = your_gcp_project_id
    bpd.options.bigquery.location = "us"
    
    # BigQuery DataFrames gives you the ability to turn your custom scalar
    # functions into a BigQuery remote function. It requires the GCP project to
    # be set up appropriately and the user having sufficient privileges to use
    # them. One can find more details about the usage and the requirements via
    # `help` command.
    help(bpd.remote_function)
    
    # Read a table and inspect the column of interest.
    df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
    df["body_mass_g"].head(10)
    
    # Define a custom function, and specify the intent to turn it into a remote
    # function. It requires a BigQuery connection. If the connection is not
    # already created, BigQuery DataFrames will attempt to create one assuming
    # the necessary APIs and IAM permissions are setup in the project. In our
    # examples we will be letting the default connection `bigframes-default-connection`
    # be used. We will also set `reuse=False` to make sure we don't
    # step over someone else creating remote function in the same project from
    # the exact same source code at the same time. Let's try a `pandas`-like use
    # case in which we want to apply a user defined scalar function to every
    # value in a `Series`, more specifically bucketize the `body_mass_g` value
    # of the penguins, which is a real number, into a category, which is a
    # string.
    @bpd.remote_function(
        float,
        str,
        reuse=False,
    )
    def get_bucket(num: float) -> str:
        if not num:
            return "NA"
        boundary = 4000
        return "at_or_above_4000" if num >= boundary else "below_4000"
    
    # Then we can apply the remote function on the `Series`` of interest via
    # `apply` API and store the result in a new column in the DataFrame.
    df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket))
    
    # This will add a new column `body_mass_bucket` in the DataFrame. You can
    # preview the original value and the bucketized value side by side.
    df[["body_mass_g", "body_mass_bucket"]].head(10)
    
    # The above operation was possible by doing all the computation on the
    # cloud. For that, there is a google cloud function deployed by serializing
    # the user code, and a BigQuery remote function created to call the cloud
    # function via the latter's http endpoint on the data in the DataFrame.
    
    # The BigQuery remote function created to support the BigQuery DataFrames
    # remote function can be located via a property `bigframes_remote_function`
    # set in the remote function object.
    print(f"Created BQ remote function: {get_bucket.bigframes_remote_function}")
    
    # The cloud function can be located via another property
    # `bigframes_cloud_function` set in the remote function object.
    print(f"Created cloud function: {get_bucket.bigframes_cloud_function}")
    
    # Warning: The deployed cloud function may be visible to other users with
    # sufficient privilege in the project, so the user should be careful about
    # having any sensitive data in the code that will be deployed as a remote
    # function.
    
    # Let's continue trying other potential use cases of remote functions. Let's
    # say we consider the `species`, `island` and `sex` of the penguins
    # sensitive information and want to redact that by replacing with their hash
    # code instead. Let's define another scalar custom function and decorate it
    # as a remote function. The custom function in this example has external
    # package dependency, which can be specified via `packages` parameter.
    @bpd.remote_function(
        str,
        str,
        reuse=False,
        packages=["cryptography"],
    )
    def get_hash(input: str) -> str:
        from cryptography.fernet import Fernet
    
        # handle missing value
        if input is None:
            input = ""
    
        key = Fernet.generate_key()
        f = Fernet(key)
        return f.encrypt(input.encode()).decode()
    
    # We can use this remote function in another `pandas`-like API `map` that
    # can be applied on a DataFrame
    df_redacted = df[["species", "island", "sex"]].map(get_hash)
    df_redacted.head(10)
    
    

您需要对创建远程函数的数据集具有 bigquery.routines.create 权限,并且对远程函数所使用的连接具有 bigquery.connections.delegate 权限(可通过 BigQuery Connection Admin 角色授予此权限)。

提供用户定义的上下文

您可以在 OPTIONS 中以键值对的形式指定 user_defined_context,这是发送到端点的每个 HTTP 请求的组成部分。通过用户定义的上下文,您可以创建多个远程函数,但只能重复使用一个端点,该端点会根据传递给它的上下文提供不同的行为。

以下示例会创建两个远程函数,以使用同一端点加密和解密 BYTES 数据。

CREATE FUNCTION `PROJECT_ID.DATASET_ID`.encrypt(x BYTES)
RETURNS BYTES
REMOTE WITH CONNECTION `PROJECT_ID.LOCATION.CONNECTION_NAME`
OPTIONS (
  endpoint = 'ENDPOINT_URL',
  user_defined_context = [("mode", "encryption")]
)

CREATE FUNCTION `PROJECT_ID.DATASET_ID`.decrypt(x BYTES)
RETURNS BYTES
REMOTE WITH CONNECTION `PROJECT_ID.LOCATION.CONNECTION_NAME`
OPTIONS (
  endpoint = 'ENDPOINT_URL',
  user_defined_context = [("mode", "decryption")]
)

限制批量请求中的行数

您可以在 OPTIONS 中指定 max_batching_rows 作为每个 HTTP 请求中的行数上限,以避免 Cloud Run 函数超时。如果未指定该选项,BigQuery 会自行确定每个批处理中包含的行数。

在查询中使用远程函数

确保您已授予对 Cloud Run 函数的权限,以便与远程函数连接关联的 BigQuery 服务账号可以访问该函数。

您还需要对远程函数所在的数据集具有 bigquery.routines.get 权限,以及针对远程函数所使用的连接的 bigquery.connections.use 权限(可通过 BigQuery Connection User 角色获得)。

您可以在查询中使用远程函数,就像使用用户定义的函数一样。

例如,您可以在示例查询中使用 remote_add 函数:

SELECT
  val,
  `PROJECT_ID.DATASET_ID`.remote_add(val, 2)
FROM
  UNNEST([NULL,2,3,5,8]) AS val;

此示例生成以下输出:

+------+-----+
|  val | f0_ |
+------+-----+
| NULL |   2 |
|    2 |   4 |
|    3 |   5 |
|    5 |   7 |
|    8 |  10 |
+------+-----+

支持的区域

BigQuery 中有两种类型的位置:

  • 单区域位置是具体的地理位置,如伦敦。

  • 多区域位置是至少包含两个地理位置的大型地理区域,如美国。

单区域

在 BigQuery 单区域数据集中,您所创建的远程函数只能使用同一区域中部署的 Cloud Run 函数。例如:

  • BigQuery 单区域 us-east4 中的远程函数只能使用 us-east4 中的 Cloud Run 函数。

因此,对于单区域,只有同时支持 Cloud Run 函数和 BigQuery 的区域才支持使用远程函数。

多区域

在 BigQuery 多区域(USEU)数据集中,您所创建的远程函数只能使用同一大型地理区域(美国、欧盟)内的区域中部署的 Cloud Run 函数。例如:

  • BigQuery US 多区域中的远程函数只能使用美国地理区域内任何单区域(例如 us-central1us-east4us-west2 等)中部署的 Cloud Run 函数。
  • BigQuery EU 多区域中的远程函数只能使用欧盟成员国内任何单区域(例如 europe-north1europe-west3 等)中部署的 Cloud Run 函数。

如需详细了解 BigQuery 单区域和多区域,请参阅数据集位置页面。如需详细了解 Cloud Run 函数区域,请参阅 Cloud Run 函数位置页面。

连接

无论是单区域位置还是多区域位置,您都只能在与所使用连接相同的位置创建远程函数。例如,如需在 US 多区域创建远程函数,请使用位于 US 多区域的连接。

价格

使用 VPC Service Controls

VPC Service Controls 是 Google Cloud 的一项功能,可让您设置安全的边界以防数据渗漏。如需将 VPC Service Controls 与远程函数搭配使用以增强安全性,或将端点与 internal traffic 入站流量设置搭配使用,请按照 VPC Service Controls 指南执行以下操作:

  1. 创建服务边界。

  2. 使用远程函数将查询的 BigQuery 项目添加到边界中。

  3. 将端点项目添加到边界中,并根据端点类型在受限服务中设置 Cloud Functions APICloud Run API。如需了解详情,请参阅 Cloud Run Functions VPC Service ControlsCloud Run VPC Service Controls

远程函数的最佳实践

  • 预过滤输入:如果在将输入传递到远程函数之前可以方便地进行过滤以减少其数量,您的查询速度将更快,费用会更低。

  • 让您的 Cloud Run 函数具备可伸缩性。可扩缩性是实例数下限实例数上限并发的函数。

    • 尽可能使用 Cloud Run 函数的实例数上限的默认值。
    • 请注意,第 1 代 HTTP Cloud Run 函数没有默认限制。为避免在测试或生产中使用第 1 代 HTTP Cloud Run 函数进行无界限伸缩,我们建议设置限制,例如 3000。
  • 此外可遵循其他 Cloud Run 函数提示以获得更好的性能。与高延迟 Cloud Run 函数交互的远程函数查询可能会因超时而失败。

  • 实现端点,以便针对失败的响应返回正确的 HTTP 响应代码和载荷。

    • 为尽量减少 BigQuery 的重试次数,请为失败的响应使用除 408、429、500、503 和 504 之外的其他 HTTP 响应代码,并确保捕获函数代码中的所有异常。否则,HTTP 服务框架可能会针对任何未捕获的异常自动返回 500。当 BigQuery 重试失败的数据分区或查询时,您可能仍会看到重试的 HTTP 请求。

    • 对于失败的响应,您的端点应返回采用定义格式的 JSON 载荷。即使不是严格要求,也能帮助 BigQuery 区分失败响应是来自函数实现还是 Cloud Functions 函数/Cloud Run 的基础架构。对于后者,BigQuery 可能会使用不同的内部限制条件进行重试。

配额

如需了解远程函数配额,请参阅配额和限制