Python 中用户定义的函数
借助 Python 用户定义的函数 (UDF),您可以使用 Python 实现标量函数,并在 SQL 查询中使用该函数。Python UDF 与 SQL 和 JavaScript UDF 类似,但具有更多功能。借助 Python UDF,您可以从 Python 软件包索引 (PyPI) 安装第三方库,还可以使用 Cloud 资源连接访问外部服务。
Python UDF 在 BigQuery 管理的资源上构建和运行。
限制
python-3.11
是唯一受支持的运行时。- 您无法创建临时 Python UDF。
- 您无法将 Python UDF 与具体化视图搭配使用。
- 调用 Python UDF 的查询的结果不会被缓存,因为 Python UDF 的返回值始终假定为具有非确定性。
INFORMATION_SCHEMA
视图中不支持 Python UDF。- 您无法使用 Routine API 创建或更新 Python UDF。
- 不支持 VPC Service Controls。
- 客户管理的加密密钥 (CMEK) 不受支持。
- 不支持以下数据类型:
JSON
、RANGE
、INTERVAL
和GEOGRAPHY
。 - 运行 Python UDF 的容器最多只能配置 2 个 vCPU 和 8 Gi。
所需 IAM 角色
所需的 IAM 角色取决于您是 Python UDF 所有者还是 Python UDF 用户。Python UDF 所有者通常会创建或更新 UDF。Python UDF 用户调用他人创建的 UDF。
如果您创建或运行引用 Cloud 资源连接的 Python UDF,还需要其他角色。
UDF 所有者
如果您要创建或更新 Python UDF,则应在相应资源上授予以下预定义的 IAM 角色:
角色 | 所需权限 | 资源 |
---|---|---|
BigQuery Data Editor (roles/bigquery.dataEditor )
|
|
创建或更新 Python UDF 的数据集。 |
BigQuery Job User (roles/bigquery.jobUser )
|
|
您运行 CREATE FUNCTION 语句的项目。
|
BigQuery Connection Admin (roles/bigquery.connectionAdmin )
|
|
您正在授予连接对外部资源的访问权限。仅当您的 UDF 使用 WITH CONNECTION 子句访问外部服务时,才需要此连接。
|
UDF 用户
如果您要调用 Python UDF,则应在相应资源上授予以下预定义的 IAM 角色:
角色 | 所需权限 | 资源 |
---|---|---|
BigQuery User (roles/bigquery.user ) |
bigquery.jobs.create 权限:用于运行引用 UDF 的查询作业。 |
您在其中运行调用 Python UDF 的查询作业的项目。 |
BigQuery Data Viewer (roles/bigquery.dataViewer ) |
bigquery.routines.get 来运行他人创建的 UDF。 |
存储 Python UDF 的数据集。 |
BigQuery Connection User (roles/bigquery.connectionUser ) |
bigquery.connections.use 来运行引用 Cloud 资源连接的 Python UDF。 |
Python UDF 引用的 Cloud 资源连接。仅当您的 UDF 引用了连接时,才需要此连接。 |
如需详细了解 BigQuery 中的角色,请参阅预定义的 IAM 角色。
创建永久性 Python UDF
创建 Python UDF 时,请遵循以下规则:
Python UDF 的正文必须是带英文引号的字符串字面量,用于表示 Python 代码。如需详细了解带英文引号的字符串字面量,请参阅带英文引号的字面量的格式。
Python UDF 的正文必须包含一个 Python 函数,该函数在 Python UDF 选项列表的
entry_point
实参中使用。需要在
runtime_version
选项中指定 Python 运行时版本。唯一受支持的 Python 运行时版本是python-3.11
。如需查看可用选项的完整列表,请参阅CREATE FUNCTION
语句的函数选项列表。
如需创建永久性 Python UDF,请使用 CREATE FUNCTION
语句,并且不带 TEMP
或 TEMPORARY
关键字。如需删除永久性 Python UDF,请使用 DROP FUNCTION
语句。
当您使用 CREATE FUNCTION
语句创建 Python UDF 时,BigQuery 会创建或更新基于基础映像的容器映像。容器基于基础映像使用您的代码和任何指定的软件包依赖项构建而成。创建容器是一个长时间运行的过程。运行 CREATE FUNCTION
语句后的第一个查询可能会自动等待映像完成。在没有任何外部依赖项的情况下,容器映像通常应在不到一分钟的时间内创建完成。
示例
如需查看创建永久性 Python UDF 的示例,请选择以下选项之一:
控制台
以下示例创建一个名为 multiplyInputs
的永久性 Python UDF,并从 SELECT
语句中调用该 UDF:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyInputs(x FLOAT64, y FLOAT64) RETURNS FLOAT64 LANGUAGE python OPTIONS(runtime_version="python-3.11", entry_point="multiply") AS r''' def multiply(x, y): return x * y '''; -- Call the Python UDF. WITH numbers AS (SELECT 1 AS x, 5 as y UNION ALL SELECT 2 AS x, 10 as y UNION ALL SELECT 3 as x, 15 as y) SELECT x, y, `PROJECT_ID.DATASET_ID`.multiplyInputs(x, y) AS product FROM numbers;
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
点击
运行。此示例生成以下输出:
+-----+-----+--------------+ | x | y | product | +-----+-----+--------------+ | 1 | 5 | 5.0 | | 2 | 10 | 20.0 | | 3 | 15 | 45.0 | +-----+-----+--------------+
BigQuery DataFrame
以下示例使用 BigQuery DataFrames 将自定义函数转换为 Python UDF:
创建矢量化 Python UDF
您可以使用矢量化技术来实现 Python UDF,以处理一批行而不是单行。矢量化可以提高查询性能。
如需控制批处理行为,请使用CREATE OR REPLACE FUNCTION
选项列表中的 max_batching_rows
选项指定每个批次中的最大行数。
如果您指定 max_batching_rows
,BigQuery 会确定批处理中的行数,但不得超过 max_batching_rows
限制。如果未指定 max_batching_rows
,系统会自动确定要批处理的行数。
矢量化 Python UDF 具有必须进行注释的单个 pandas.DataFrame
实参。pandas.DataFrame
实参的列数与 CREATE FUNCTION
语句中定义的 Python UDF 参数的列数相同。pandas.DataFrame
实参中的列名称与 UDF 的形参名称相同。
您的函数需要返回 pandas.Series
或单列 pandas.DataFrame
,且行数与输入相同。
以下示例创建了一个名为 multiplyInputs
的矢量化 Python UDF,其中包含两个形参:x
和 y
:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.multiplyVectorized(x FLOAT64, y FLOAT64) RETURNS FLOAT64 LANGUAGE python OPTIONS(runtime_version="python-3.11", entry_point="vectorized_multiply") AS r''' import pandas as pd def vectorized_multiply(df: pd.DataFrame): return df['x'] * df['y'] ''';
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
调用 UDF 的方式与上一个示例相同。
点击
运行。
支持的 Python UDF 数据类型
下表定义了 BigQuery 数据类型、Python 数据类型和 Pandas 数据类型之间的映射:
BigQuery 数据类型 | 标准 UDF 使用的 Python 内置数据类型 | 矢量化 UDF 使用的 Pandas 数据类型 | 矢量化 UDF 中用于 ARRAY 和 STRUCT 的 PyArrow 数据类型 |
---|---|---|---|
BOOL |
bool |
BooleanDtype |
DataType(bool) |
INT64 |
int |
Int64Dtype |
DataType(int64) |
FLOAT64 |
float |
FloatDtype |
DataType(double) |
STRING |
str |
StringDtype |
DataType(string) |
BYTES |
bytes |
binary[pyarrow] |
DataType(binary) |
TIMESTAMP |
函数形参: 函数返回值: |
函数形参: 函数返回值: |
TimestampType(timestamp[us]) ,带时区 |
DATE |
datetime.date |
date32[pyarrow] |
DataType(date32[day]) |
TIME |
datetime.time |
time64[pyarrow] |
Time64Type(time64[us]) |
DATETIME |
datetime.datetime (不含时区) |
timestamp[us][pyarrow] |
TimestampType(timestamp[us]) ,不含时区 |
ARRAY |
list |
list<...>[pyarrow] ,其中元素数据类型为 pandas.ArrowDtype |
ListType |
STRUCT |
dict |
struct<...>[pyarrow] ,其中字段数据类型为 pandas.ArrowDtype |
StructType |
支持的运行时版本
BigQuery Python UDF 支持 python-3.11
运行时。此 Python 版本包含一些额外的预安装软件包。对于系统库,请检查运行时基础映像。
运行时版本 | Python 版本 | 包含 | 运行时基础映像 |
---|---|---|---|
python-3.11 | Python 3.11 | numpy 1.26.3 pyarrow 14.0.2 pandas 2.1.4 python-dateutil 2.8.2 |
google-22-full/python311 |
使用第三方软件包
您可以使用选项列表来使用 Python 标准库和预安装软件包提供的模块以外的模块。CREATE FUNCTION
您可以从 Python Package Index (PyPI) 安装软件包,也可以从 Cloud Storage 导入 Python 文件。
从 Python 软件包索引安装软件包
安装软件包时,您必须提供软件包名称,还可以选择使用 Python 软件包版本说明符提供软件包版本。如果软件包位于运行时中,则使用该软件包,除非在 CREATE FUNCTION
选项列表中指定了特定版本。如果未指定软件包版本,并且软件包不在运行时中,则使用最新可用版本。仅支持采用 Wheel 二进制格式的软件包。
以下示例展示了如何创建使用 CREATE OR REPLACE FUNCTION
选项列表安装 scipy
软件包的 Python UDF:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.area(radius FLOAT64) RETURNS FLOAT64 LANGUAGE python OPTIONS (entry_point='area_handler', runtime_version='python-3.11', packages=['scipy==1.15.3']) AS r""" import scipy def area_handler(radius): return scipy.constants.pi*radius*radius """; SELECT `PROJECT_ID.DATASET_ID`.area(4.5);
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
点击
运行。
将其他 Python 文件作为库导入
您可以通过从 Cloud Storage 导入 Python 文件,使用函数选项列表来扩展 Python UDF。
在 UDF 的 Python 代码中,您可以使用 import 语句后跟 Cloud Storage 对象的路径,将 Cloud Storage 中的 Python 文件作为模块导入。例如,如果您要导入 gs://BUCKET_NAME/path/to/lib1.py
,则导入语句应为 import path.to.lib1
。
Python 文件名必须是 Python 标识符。对象名称(在 /
之后)中的每个 folder
名称都应是有效的 Python 标识符。在 ASCII 范围 (U+0001..U+007F) 内,标识符中可以使用以下字符:
- 大写字母和小写字母 A 到 Z。
- 下划线。
- 数字 0 到 9,但数字不能作为标识符中的第一个字符。
以下示例展示了如何创建从名为 my_bucket
的 Cloud Storage 存储桶导入 lib1.py
客户端库软件包的 Python UDF:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.myFunc(a FLOAT64, b STRING) RETURNS STRING LANGUAGE python OPTIONS ( entry_point='compute', runtime_version='python-3.11', library=['gs://my_bucket/path/to/lib1.py']) AS r""" import path.to.lib1 as lib1 def compute(a, b): # doInterestingStuff is a function defined in # gs://my_bucket/path/to/lib1.py return lib1.doInterestingStuff(a, b); """;
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
点击
运行。
为 Python UDF 配置容器限制
您可以使用 CREATE FUNCTION
选项列表为运行 Python UDF 的容器指定 CPU 和内存限制。
默认情况下,系统会为每个容器实例分配 512 MiB 内存和 0.33 个 vCPU。
以下示例使用 CREATE FUNCTION
选项列表创建了一个 Python UDF,用于指定容器限制:
转到 BigQuery 页面。
在查询编辑器中,输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.resizeImage(image BYTES) RETURNS BYTES LANGUAGE python OPTIONS (entry_point='resize_image', runtime_version='python-3.11', packages=['Pillow==11.2.1'], container_memory='2Gi', container_cpu=1) AS r""" import io from PIL import Image def resize_image(image_bytes): img = Image.open(io.BytesIO(image_bytes)) resized_img = img.resize((256, 256), Image.Resampling.LANCZOS) output_stream = io.BytesIO() resized_img.convert('RGB').save(output_stream, format='JPEG') return output_stream.getvalue() """;
替换 PROJECT_ID。DATASET_ID 替换为您的项目 ID 和数据集 ID。
点击
运行。
支持的 CPU 值
Python UDF 支持介于 0.33
和 1.0
之间的分数 CPU 值,以及 1
、2
的非分数 CPU 值。在将小数输入值应用于容器之前,系统会将其四舍五入到小数点后两位。
支持的内存值
Python UDF 容器支持以下格式的内存值:<integer_number><unit>
。单位必须是以下值之一:Mi
、M
、Gi
、G
。您可以配置的内存量下限为 256 Mebibyte (256 Mi)。
您可以配置的最大内存量为 8 吉比字节 (8 Gi)。
根据您选择的内存值,您还必须指定最小 CPU 量。下表显示了每个内存值的最低 CPU 值:
内存 | 最低 CPU |
---|---|
512 MiB or less |
0.33 |
More than 512 MiB |
0.5 |
More than 1 GiB |
1 |
More than 4 GiB |
2 |
在 Python 代码中调用 Google Cloud 或在线服务
Python UDF 使用 Cloud 资源连接服务账号访问 Google Cloud 服务或外部服务。必须向连接的服务账号授予访问相应服务的权限。所需的权限因所访问的服务和从 Python 代码调用的 API 而异。
如果您创建 Python UDF 时未使用 Cloud 资源连接,则该函数会在阻止网络访问的环境中执行。如果您的 UDF 访问在线服务,则必须使用 Cloud 资源连接创建该 UDF。如果不这样做,在达到内部连接超时时间之前,UDF 将无法访问网络。
以下示例展示了如何从 Python UDF 访问 Cloud Translation 服务。此示例包含两个项目:一个名为 my_query_project
的项目,您可以在其中创建 UDF 和 Cloud 资源连接;另一个名为 my_translate_project
的项目,您可以在其中运行 Cloud Translation。
创建 Cloud 资源连接
首先,您需要在 my_query_project
中创建 Cloud 资源连接。如需创建 Cloud 资源连接,请按照创建 Cloud 资源连接页面上的步骤操作。
创建连接后,打开该连接,然后在连接信息窗格中复制服务账号 ID。为连接配置权限时,您需要使用此 ID。当您创建连接资源时,BigQuery 会创建一个唯一的系统服务账号,并将其与该连接相关联。
向连接的服务账号授予访问权限
如需向 Cloud 资源连接服务账号授予对项目的访问权限,请在 my_query_project
中向该服务账号授予服务使用方角色 (roles/serviceusage.serviceUsageConsumer
),并在 my_translate_project
中向该服务账号授予 Cloud Translation API 用户角色 (roles/cloudtranslate.user
)。
转到 IAM 页面。
验证
my_query_project
是否已选中。点击
授予访问权限。在新建主账号字段中,输入您之前复制的 Cloud 资源连接的服务账号 ID。
在选择角色字段中,选择服务使用情况,然后选择服务使用情况消费者。
点击保存。
在项目选择器中,选择
my_translate_project
。转到 IAM 页面。
点击
授予访问权限。在新建主账号字段中,输入您之前复制的 Cloud 资源连接的服务账号 ID。
在选择角色字段中,选择 Cloud Translation,然后选择 Cloud Translation API User。
点击保存。
创建调用 Cloud Translation 服务的 Python UDF
在 my_query_project
中,创建一个 Python UDF,用于使用您的 Cloud 资源连接调用 Cloud Translation 服务。
转到 BigQuery 页面。
在查询编辑器中输入以下
CREATE FUNCTION
语句:CREATE FUNCTION `PROJECT_ID.DATASET_ID`.translate_to_es(x STRING) RETURNS STRING LANGUAGE python WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID` OPTIONS (entry_point='do_translate', runtime_version='python-3.11', packages=['google-cloud-translate>=3.11', 'google-api-core']) AS r""" from google.api_core.retry import Retry from google.cloud import translate project = "my_translate_project" translate_client = translate.TranslationServiceClient() def do_translate(x : str) -> str: response = translate_client.translate_text( request={ "parent": f"projects/{project}/locations/us-central1", "contents": [x], "target_language_code": "es", "mime_type": "text/plain", }, retry=Retry(), ) return response.translations[0].translated_text """; -- Call the UDF. WITH text_table AS (SELECT "Hello" AS text UNION ALL SELECT "Good morning" AS text UNION ALL SELECT "Goodbye" AS text) SELECT text, `PROJECT_ID.DATASET_ID`.translate_to_es(text) AS translated_text FROM text_table;
替换以下内容:
PROJECT_ID.DATASET_ID
:您的项目 ID 和数据集 IDREGION.CONNECTION_ID
:连接的区域和连接 ID
点击
运行。输出应如下所示:
+--------------------------+-------------------------------+ | text | translated_text | +--------------------------+-------------------------------+ | Hello | Hola | | Good morning | Buen dia | | Goodbye | Adios | +--------------------------+-------------------------------+
支持的位置
所有 BigQuery 多区域和区域位置都支持 Python UDF。
价格
Python UDF 免费提供。
启用结算功能后,系统会执行以下操作:
- Python UDF 费用采用 BigQuery Services SKU 计费。
- 费用与调用 Python UDF 时消耗的计算量和内存量成正比。
- Python UDF 客户还需要支付构建或重新构建 UDF 容器映像的费用。此费用与使用客户代码和依赖项构建映像所用的资源成正比。
- 如果 Python UDF 导致外部或互联网网络出站流量,您还会看到 Cloud Networking 收取的优质层级互联网出站流量费用。