Python 中用户定义的函数
Python 用户定义的函数 (UDF) 使您可以在 Python 中实现标量函数,并在 SQL 查询中进行使用。Python UDF 与 SQL 和 JavaScript UDF 类似,但具有更多功能。借助 Python UDF,您可以通过 Python 软件包索引 (PyPI) 安装第三方库,还可以使用 Cloud 资源连接访问外部服务。
Python UDF 在 BigQuery 托管资源上构建和运行。
限制
python-3.11
是唯一受支持的运行时。- 您无法创建临时 Python UDF。
- 您无法将 Python UDF 与物化视图搭配使用。
- 调用 Python UDF 的查询的结果不会进行缓存,因为 Python UDF 的返回值始终假定为具有非确定性。
INFORMATION_SCHEMA
视图中未完全支持 Python UDF。- 您无法使用 Routine API 创建或更新 Python UDF。
- 不支持 VPC Service Controls。
- 不支持客户管理的加密密钥 (CMEK)。
- 不支持以下数据类型:
JSON
、RANGE
、INTERVAL
和GEOGRAPHY
。 - 运行 Python UDF 的容器最多只能配置 2 个 vCPU 和 8 Gi。
所需 IAM 角色
所需的 IAM 角色取决于您是 Python UDF 所有者还是 Python UDF 用户。Python UDF 所有者通常会创建或更新 UDF。Python UDF 用户会调用他人创建的 UDF。
如果您创建或运行引用 Cloud 资源连接的 Python UDF,还需要其他角色。
UDF 所有者
如果您要创建或更新 Python UDF,则应被授予针对相应资源的以下预定义 IAM 角色:
角色 | 所需权限 | 资源 |
---|---|---|
BigQuery Data Editor (roles/bigquery.dataEditor )
|
|
在其中创建或更新 Python UDF 的数据集。 |
BigQuery Job User (roles/bigquery.jobUser )
|
|
在其中运行 CREATE FUNCTION 语句的项目。
|
BigQuery Connection Admin (roles/bigquery.connectionAdmin )
|
|
您向其授予对外部资源的访问权限的连接。仅当您的 UDF 使用 WITH CONNECTION 子句访问外部服务时,才需要此连接。
|
UDF 用户
如果您要调用 Python UDF,则应被授予针对相应资源的以下预定义 IAM 角色:
角色 | 所需权限 | 资源 |
---|---|---|
BigQuery User (roles/bigquery.user ) |
bigquery.jobs.create ,用于运行引用 UDF 的查询作业。 |
在其中运行调用 Python UDF 的查询作业的项目。 |
BigQuery Data Viewer (roles/bigquery.dataViewer ) |
bigquery.routines.get ,用于运行他人创建的 UDF。 |
在其中存储 Python UDF 的数据集。 |
BigQuery Connection User (roles/bigquery.connectionUser ) |
bigquery.connections.use ,用于运行引用 Cloud 资源连接的 Python UDF。 |
Python UDF 引用的 Cloud 资源连接。仅当您的 UDF 引用连接时,才需要此连接。 |
如需详细了解 BigQuery 中的角色,请参阅预定义的 IAM 角色。
创建永久性 Python UDF
创建 Python UDF 时,请遵循以下规则:
Python UDF 的主体必须是带英文引号的字符串字面量,用于表示 Python 代码。如需详细了解带英文引号的字符串字面量,请参阅带英文引号的字面量的格式。
Python UDF 的主体必须包含一个 Python 函数,该函数在 Python UDF 选项列表的
entry_point
参数中使用。需要在
runtime_version
选项中指定 Python 运行时版本。唯一受支持的 Python 运行时版本是python-3.11
。如需查看可用选项的完整列表,请参阅CREATE FUNCTION
语句的函数选项列表。
如需创建永久性 Python UDF,请使用 CREATE FUNCTION
语句,并且不带 TEMP
或 TEMPORARY
关键字。如需删除永久性 Python UDF,请使用 DROP FUNCTION
语句。
当您使用 CREATE FUNCTION
语句创建 Python UDF 时,BigQuery 会创建或更新基于基础映像的容器映像。容器使用您的代码和任何指定的软件包依赖项,基于基础映像构建而成。创建容器是一个长时间运行的过程。运行 CREATE FUNCTION
语句后的第一个查询可能会自动等待映像完成。在没有任何外部依赖项的情况下,容器映像通常应在不到一分钟的时间内创建完成。
示例
如需查看创建永久性 Python UDF 的示例,请选择以下选项之一:
控制台
以下示例创建一个名为 multiplyInputs
的永久性 Python UDF,并从 SELECT
语句中调用该 UDF:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyInputs(x FLOAT64, y FLOAT64) RETURNS FLOAT64 LANGUAGE python OPTIONS(runtime_version="python-3.11", entry_point="multiply") AS r''' def multiply(x, y): return x * y '''; -- Call the Python UDF. WITH numbers AS (SELECT 1 AS x, 5 as y UNION ALL SELECT 2 AS x, 10 as y UNION ALL SELECT 3 as x, 15 as y) SELECT x, y, `PROJECT_ID.DATASET_ID`.multiplyInputs(x, y) AS product FROM numbers;
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
点击
运行。此示例生成以下输出:
+-----+-----+--------------+ | x | y | product | +-----+-----+--------------+ | 1 | 5 | 5.0 | | 2 | 10 | 20.0 | | 3 | 15 | 45.0 | +-----+-----+--------------+
BigQuery DataFrame
以下示例使用 BigQuery DataFrames 将自定义函数转换为 Python UDF:
创建向量化 Python UDF
您可以使用向量化技术来实现 Python UDF,以处理一批行而不是单个行。向量化可以提高查询性能。
如需控制批处理行为,请使用 CREATE OR REPLACE FUNCTION
选项列表中的 max_batching_rows
选项指定每个批次中的最大行数。如果您指定 max_batching_rows
,BigQuery 会确定批次中的行数(不会超过 max_batching_rows
限制)。如果未指定 max_batching_rows
,系统会自动确定要进行批处理的行数。
向量化 Python UDF 具有必须进行注解的单个 pandas.DataFrame
参数。pandas.DataFrame
参数的列数与 CREATE FUNCTION
语句中定义的 Python UDF 参数相同。pandas.DataFrame
参数中的列名称与 UDF 的参数名称相同。
您的函数需要返回 pandas.Series
或单列 pandas.DataFrame
,且行数与输入相同。
以下示例创建一个名为 multiplyInputs
的向量化 Python UDF,其中包含两个参数 x
和 y
:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyVectorized(x FLOAT64, y FLOAT64) RETURNS FLOAT64 LANGUAGE python OPTIONS(runtime_version="python-3.11", entry_point="vectorized_multiply") AS r''' import pandas as pd def vectorized_multiply(df: pd.DataFrame): return df['x'] * df['y'] ''';
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
调用 UDF 的方式与上一个示例相同。
点击
运行。
支持的 Python UDF 数据类型
下表定义了 BigQuery 数据类型、Python 数据类型和 Pandas 数据类型之间的映射:
BigQuery 数据类型 | 标准 UDF 使用的 Python 内置数据类型 | 向量化 UDF 使用的 Pandas 数据类型 | 向量化 UDF 中用于 ARRAY 和 STRUCT 的 PyArrow 数据类型 |
---|---|---|---|
BOOL |
bool |
BooleanDtype |
DataType(bool) |
INT64 |
int |
Int64Dtype |
DataType(int64) |
FLOAT64 |
float |
FloatDtype |
DataType(double) |
STRING |
str |
StringDtype |
DataType(string) |
BYTES |
bytes |
binary[pyarrow] |
DataType(binary) |
TIMESTAMP |
函数参数: 函数返回值: |
函数参数: 函数返回值: |
TimestampType(timestamp[us]) ,带有时区 |
DATE |
datetime.date |
date32[pyarrow] |
DataType(date32[day]) |
TIME |
datetime.time |
time64[pyarrow] |
Time64Type(time64[us]) |
DATETIME |
datetime.datetime (不带时区) |
timestamp[us][pyarrow] |
TimestampType(timestamp[us]) ,不带时区 |
ARRAY |
list |
list<...>[pyarrow] ,其中元素数据类型为 pandas.ArrowDtype |
ListType |
STRUCT |
dict |
struct<...>[pyarrow] ,其中字段数据类型为 pandas.ArrowDtype |
StructType |
支持的运行时版本
BigQuery Python UDF 支持 python-3.11
运行时。此 Python 版本包含一些额外的预安装软件包。对于系统库,请检查运行时基础映像。
运行时版本 | Python 版本 | 包含 | 运行时基础映像 |
---|---|---|---|
python-3.11 | Python 3.11 | numpy 1.26.3 pyarrow 14.0.2 pandas 2.1.4 python-dateutil 2.8.2 |
google-22-full/python311 |
使用第三方软件包
您可以通过 CREATE FUNCTION
选项列表来使用 Python 标准库和预安装软件包提供的模块以外的模块。您可以通过 Python 软件包索引 (PyPI) 安装软件包,也可以从 Cloud Storage 导入 Python 文件。
通过 Python 软件包索引安装软件包
安装软件包时,您必须提供软件包名称,还可以选择使用 Python 软件包版本说明符提供软件包版本。如果软件包在运行时中,则使用该软件包,除非在 CREATE FUNCTION
选项列表中指定了特定版本。如果未指定软件包版本,并且软件包不在运行时中,则使用最新可用版本。仅支持采用 wheels 二进制格式的软件包。
以下示例展示了如何创建使用 CREATE OR REPLACE FUNCTION
选项列表安装 scipy
软件包的 Python UDF:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.area(radius FLOAT64) RETURNS FLOAT64 LANGUAGE python OPTIONS (entry_point='area_handler', runtime_version='python-3.11', packages=['scipy==1.15.3']) AS r""" import scipy def area_handler(radius): return scipy.constants.pi*radius*radius """; SELECT `PROJECT_ID.DATASET_ID`.area(4.5);
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
点击
运行。
将其他 Python 文件作为库导入
您可以通过从 Cloud Storage 导入 Python 文件,使用函数选项列表来扩展 Python UDF。
在 UDF 的 Python 代码中,您可以使用导入语句并且后跟 Cloud Storage 对象的路径,将 Cloud Storage 中的 Python 文件作为模块导入。例如,如果您要导入 gs://BUCKET_NAME/path/to/lib1.py
,则导入语句应为 import path.to.lib1
。
Python 文件名必须是 Python 标识符。对象名称(在 /
之后)中的每个 folder
名称都应是有效的 Python 标识符。在 ASCII 范围 (U+0001..U+007F) 内,标识符中可以使用以下字符:
- 大写字母和小写字母 A 到 Z。
- 下划线。
- 数字 0 到 9,但数字不能作为标识符中的第一个字符。
以下示例展示了如何创建从名为 my_bucket
的 Cloud Storage 存储桶导入 lib1.py
客户端库软件包的 Python UDF:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.myFunc(a FLOAT64, b STRING) RETURNS STRING LANGUAGE python OPTIONS ( entry_point='compute', runtime_version='python-3.11', library=['gs://my_bucket/path/to/lib1.py']) AS r""" import path.to.lib1 as lib1 def compute(a, b): # doInterestingStuff is a function defined in # gs://my_bucket/path/to/lib1.py return lib1.doInterestingStuff(a, b); """;
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
点击
运行。
为 Python UDF 配置容器限制
您可以使用 CREATE FUNCTION
选项列表为运行 Python UDF 的容器指定 CPU 和内存限制。
默认情况下,为每个容器实例分配的内存为 512 MiB,分配的 CPU 为 0.33 个 vCPU。
以下示例通过使用 CREATE FUNCTION
选项列表指定容器限制来创建一个 Python UDF:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.resizeImage(image BYTES) RETURNS BYTES LANGUAGE python OPTIONS (entry_point='resize_image', runtime_version='python-3.11', packages=['Pillow==11.2.1'], container_memory='2Gi', container_cpu=1) AS r""" import io from PIL import Image def resize_image(image_bytes): img = Image.open(io.BytesIO(image_bytes)) resized_img = img.resize((256, 256), Image.Resampling.LANCZOS) output_stream = io.BytesIO() resized_img.convert('RGB').save(output_stream, format='JPEG') return output_stream.getvalue() """;
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
点击
运行。
支持的 CPU 值
Python UDF 支持介于 0.33
和 1.0
之间的小数形式 CPU 值,以及非小数形式 CPU 值 1
、2
。在将小数输入值应用于容器之前,系统会将其四舍五入为两位小数。
支持的内存值
Python UDF 容器支持以下格式的内存值:<integer_number><unit>
。单位必须是以下值之一:Mi
、M
、Gi
、G
。您可以配置的内存量下限为 256 兆比字节 (256 Mi)。您可以配置的内存量上限为 8 吉比字节 (8 Gi)。
根据您选择的内存值,您还必须指定 CPU 量下限。下表显示了每个内存值对应的 CPU 下限值:
内存 | CPU 下限 |
---|---|
512 MiB or less |
0.33 |
More than 512 MiB |
0.5 |
More than 1 GiB |
1 |
More than 4 GiB |
2 |
在 Python 代码中调用 Google Cloud 或在线服务
Python UDF 使用 Cloud 资源连接服务账号访问 Google Cloud 服务或外部服务。您必须向连接的服务账号授予访问相应服务的权限。所需的权限因所访问的服务以及从 Python 代码调用的 API 而异。
如果您创建 Python UDF 时未使用 Cloud 资源连接,则函数会在阻止网络访问的环境中执行。如果您的 UDF 会访问在线服务,则必须使用 Cloud 资源连接创建 UDF。如果不这样做,系统会阻止 UDF 访问网络,直到达到内部连接超时时间。
以下示例展示了如何从 Python UDF 访问 Cloud Translation 服务。此示例包含两个项目:一个名为 my_query_project
的项目,您会在其中创建 UDF 和 Cloud 资源连接;以及一个名为 my_translate_project
的项目,您会在其中运行 Cloud Translation。
创建 Cloud 资源连接
首先,您需要在 my_query_project
中创建 Cloud 资源连接。如需创建 Cloud 资源连接,请按照创建 Cloud 资源连接页面上的步骤操作。
创建连接后,打开连接,然后在连接信息窗格中复制服务账号 ID。为连接配置权限时,您需要使用此 ID。当您创建连接资源时,BigQuery 会创建一个唯一的系统服务账号,并将其与该连接相关联。
向连接的服务账号授予访问权限
如需向 Cloud 资源连接服务账号授予对项目的访问权限,请在 my_query_project
中向该服务账号授予 Service Usage Consumer 角色 (roles/serviceusage.serviceUsageConsumer
),并在 my_translate_project
中向该服务账号授予 Cloud Translation API User 角色 (roles/cloudtranslate.user
)。
转到 IAM 页面。
验证是否选择了
my_query_project
。点击
授予访问权限。在新建主账号字段中,输入您之前复制的 Cloud 资源连接的服务账号 ID。
在选择角色字段中,选择 Service Usage,然后选择 Service Usage Consumer。
点击保存。
在项目选择器中,选择
my_translate_project
。转到 IAM 页面。
点击
授予访问权限。在新建主账号字段中,输入您之前复制的 Cloud 资源连接的服务账号 ID。
在选择角色字段中,选择 Cloud Translation,然后选择 Cloud Translation API User。
点击保存。
创建调用 Cloud Translation 服务的 Python UDF
在 my_query_project
中,创建一个 Python UDF,以使用您的 Cloud 资源连接调用 Cloud Translation 服务。
转到 BigQuery 页面。
在查询编辑器中输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.translate_to_es(x STRING) RETURNS STRING LANGUAGE python WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID` OPTIONS (entry_point='do_translate', runtime_version='python-3.11', packages=['google-cloud-translate>=3.11', 'google-api-core']) AS r""" from google.api_core.retry import Retry from google.cloud import translate project = "my_translate_project" translate_client = translate.TranslationServiceClient() def do_translate(x : str) -> str: response = translate_client.translate_text( request={ "parent": f"projects/{project}/locations/us-central1", "contents": [x], "target_language_code": "es", "mime_type": "text/plain", }, retry=Retry(), ) return response.translations[0].translated_text """; -- Call the UDF. WITH text_table AS (SELECT "Hello" AS text UNION ALL SELECT "Good morning" AS text UNION ALL SELECT "Goodbye" AS text) SELECT text, `PROJECT_ID.DATASET_ID`.translate_to_es(text) AS translated_text FROM text_table;
替换以下内容:
PROJECT_ID.DATASET_ID
:您的项目 ID 和数据集 IDREGION.CONNECTION_ID
:您的连接的区域和连接 ID
点击
运行。输出应如下所示:
+--------------------------+-------------------------------+ | text | translated_text | +--------------------------+-------------------------------+ | Hello | Hola | | Good morning | Buen dia | | Goodbye | Adios | +--------------------------+-------------------------------+
支持的位置
所有 BigQuery 多区域和单区域位置都支持 Python UDF。
价格
使用 Python UDF 时无需任何额外的费用。
启用结算功能后,以下规则适用:
- Python UDF 费用采用 BigQuery 服务 SKU 计费。
- 费用与调用 Python UDF 时消耗的计算量和内存量成正比。
- Python UDF 客户还需要支付构建或重新构建 UDF 容器映像的费用。此费用与使用客户代码和依赖项构建映像所用的资源成正比。
- 如果 Python UDF 导致外部或互联网网络出站流量,您还会看到 Cloud Networking 收取的高级层级互联网出站流量费用。