使用 Apache Spark 的存储过程

本文档面向数据工程师、数据科学家和数据分析师,用于在 BigQuery 中创建和调用 Spark 存储过程。

BigQuery 可让您创建并运行以 Python、Java 和 Scala 编写的 Spark 存储过程。然后,您可以使用 Google SQL 查询在 BigQuery 中运行这些存储过程,与运行 SQL 存储过程类似。

须知事项

要为 Spark 创建存储过程,请让您的管理员创建一个 Spark 连接并与您共享。您的管理员还必须为与连接关联的服务账号授予所需的 Identity and Access Management (IAM) 权限

所需的角色

如需获得执行本文档中的任务所需的权限,请让管理员为您授予以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理访问权限

这些预定义角色包含执行本文档中的任务所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需执行本文档中的任务,您需要拥有以下权限:

  • 创建连接
    • bigquery.connections.create
    • bigquery.connections.list
  • 为 Spark 创建存储过程:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • 为 Spark 调用存储过程:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

位置注意事项

您必须在与连接相同的位置为 Spark 创建存储过程,因为存储过程与连接相同的位置运行。例如,要在美国多区域中创建存储过程,请使用位于美国多区域中的连接。

价格

  • 在 BigQuery 上运行 Spark 过程的费用与在 Dataproc Serverless 上运行 Spark 过程的费用类似。如需了解详情,请参阅 Dataproc Serverless 价格

  • Spark 存储过程可与按需价格模式以及任何 BigQuery 版本搭配使用。无论项目中使用何种计算价格模式,Spark 过程在所有情况下都使用 BigQuery 企业版随用随付模式来收费。

  • 适用于 BigQuery 的 Spark 存储过程不支持使用预留或承诺。现有预留和承诺将继续用于其他受支持的查询和过程。Spark 存储过程的使用费会作为企业版即随用随付费用添加到您的账单中,并将应用适用的组织折扣。

  • Spark 存储过程使用 Spark 执行引擎,但您不会看到单独的 Spark 执行费用。如上所述,相应费用会作为 BigQuery 企业版随用随付 SKU 计入。

  • Spark 存储过程不提供免费层级。

创建 Spark 存储过程

您必须在与连接相同的位置中创建存储过程。

如果存储过程的主体超过 1 MB,则建议您将存储过程存放在 Cloud Storage 存储桶内的文件中,而不是使用内嵌代码。 BigQuery 提供了两种使用 Python 创建 Spark 存储过程的方法:

使用 SQL 查询编辑器

如需在 SQL 查询编辑器中创建 Spark 存储过程,请按以下步骤操作:

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,为显示的 CREATE PROCEDURE 语句添加示例代码。

    或者,在探索器窗格中,点击项目中您用于创建连接资源的连接。如需为 Spark 创建存储过程,请点击 创建存储过程

    Python

    如需使用 Python 创建 Spark 存储过程,请使用以下示例代码:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java 或 Scala

    如需使用 Java 或 Scala 以及 main_file_uri 选项创建 Spark 存储过程,请使用以下示例代码:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    如需使用 Java 或 Scala 以及 main_classjar_uris 选项创建 Spark 存储过程,请使用以下示例代码:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    请替换以下内容:

    • PROJECT_ID:您要在其中创建存储过程的项目,例如 myproject
    • DATASET:您要在其中创建存储过程的数据集,例如 mydataset
    • PROCEDURE_NAME:您要在 BigQuery 中运行的存储过程的名称,例如 mysparkprocedure
    • PROCEDURE_ARGUMENT:一个参数,用于键入输入参数。

      在此参数中,指定以下字段:

      • ARGUMENT_MODE:参数的模式。

        有效值包括 INOUTINOUT。默认情况下,值为 IN

      • ARGUMENT_NAME:参数的名称。
      • ARGUMENT_TYPE:参数的类型。

      例如:myproject.mydataset.mysparkproc(num INT64)

      如需了解详情,请参阅本文档中的“将值作为 IN 参数OUTINOUT 参数传递”。

    • CONNECTION_PROJECT_ID:包含运行 Spark 过程的连接的项目。
    • CONNECTION_REGION:包含运行 Spark 过程的连接的区域,例如 us
    • CONNECTION_ID:连接 ID,例如 myconnection

      当您在 Google Cloud 控制台中查看连接详情时,连接 ID 是连接 ID 中显示的完全限定连接 ID 的最后一部分中的值,例如 projects/myproject/locations/connection_location/connections/myconnection

    • RUNTIME_VERSION:Spark 的运行时版本,例如 1.1
    • MAIN_PYTHON_FILE_URI:PySpark 文件的路径,例如 gs://mybucket/mypysparkmain.py

      或者,如果要在 CREATE PROCEDURE 语句中添加存储过程的主体,请在 LANGUAGE PYTHON AS 后面添加 PYSPARK_CODE,如本文中使用内嵌代码中的示例所示。

    • PYSPARK_CODECREATE PROCEDURE 语句中的 PySpark 应用的定义(如果要以内嵌方式传递存储过程的主体)。

      该值为字符串字面量。如果代码包含英文引号和反斜杠,则必须对它们进行转义或表示为原始字符串。例如,代码 return "\n"; 可以表示为以下之一:

      • 带英文引号的字符串:"return \"\\n\";"。同时对引号和反斜杠进行转义。
      • 带英文三引号的字符串:"""return "\\n";"""。对反斜杠进行转义,但不对引号进行转义。
      • 原始字符串:r"""return "\n";"""。无需进行转义。
      如需了解如何添加内嵌 PySpark 代码,请参阅使用内嵌代码
    • MAIN_JAR_URI:包含 main 类的 JAR 文件的路径,例如 gs://mybucket/my_main.jar
    • CLASS_NAME:使用 jar_uris 选项设置的 JAR 中类的完全限定名称,例如 com.example.wordcount
    • URI:包含 main 类中指定的类的 JAR 文件的路径,例如 gs://mybucket/mypysparkmain.jar

    如需了解您可以在 OPTIONS 中指定的其他选项,请参阅过程选项列表

使用 PySpark 编辑器

使用 PySpark 编辑器创建过程时,您无需使用 CREATE PROCEDURE 语句。而是直接在 Pyspark 编辑器中添加 Python 代码,然后保存或运行代码。

如需在 PySpark 编辑器中创建 Spark 存储过程,请按照以下步骤操作:

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 如果要直接输入 PySpark 代码,请打开 PySpark 编辑器。如需打开 PySpark 编辑器,请点击 创建 SQL 查询旁边的 菜单,然后选择创建 PySpark 过程

  3. 如需设置选项,请点击更多 > PySpark 选项,然后执行以下操作:

    1. 指定要在其中运行 PySpark 代码的位置。

    2. 连接字段中,指定 Spark 连接。

    3. 已存储的过程调用部分,指定要在其中存储生成的临时存储过程的数据集。您可以设置特定数据集,或允许使用临时数据集来调用 PySpark 代码。

      临时数据集使用上一步中指定的位置生成。如果指定了数据集名称,请确保数据集和 Spark 连接必须位于同一位置。

    4. 参数部分,为存储过程定义参数。参数的值仅在 PySpark 代码的会话内运行期间使用,但声明本身存储在相应的存储过程中。

    5. 高级选项部分中,指定过程选项。如需查看过程选项的详细列表,请参阅过程选项列表

    6. 属性部分,添加键值对以配置作业。您可以使用 Dataproc Serverless Spark 属性中提供的任何键值对。

    7. 服务账号设置中,指定在 PySpark 代码的会话内运行期间要使用的自定义服务账号、CMEK、暂存数据集和暂存 Cloud Storage 文件夹。

    8. 点击保存

为 Spark 保存存储过程

使用 PySpark 编辑器创建存储过程后,您可以保存存储过程。请按以下步骤操作:

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,将 Python 与 PySpark 编辑器搭配使用为 Spark 创建存储过程。

  3. 点击 保存 > 保存过程

  4. 保存存储过程对话框中,指定要保存存储过程的数据集名称以及存储过程的名称。

  5. 点击保存

    如果只想运行 PySpark 代码,而不是将其保存为存储过程,则可以点击运行而不是保存

使用自定义容器

自定义容器为工作负载的驱动程序和执行程序进程提供运行时环境。如需使用自定义容器,请使用以下示例代码:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

替换以下内容:

  • PROJECT_ID:您要在其中创建存储过程的项目,例如 myproject
  • DATASET:您要在其中创建存储过程的数据集,例如 mydataset
  • PROCEDURE_NAME:您要在 BigQuery 中运行的存储过程的名称,例如 mysparkprocedure
  • PROCEDURE_ARGUMENT:一个参数,用于键入输入参数。

    在此参数中,指定以下字段:

    • ARGUMENT_MODE:参数的模式。

      有效值包括 INOUTINOUT。默认情况下,值为 IN

    • ARGUMENT_NAME:参数的名称。
    • ARGUMENT_TYPE:参数的类型。

    例如:myproject.mydataset.mysparkproc(num INT64)

    如需了解详情,请参阅本文档中的“将值作为 IN 参数OUTINOUT 参数传递”。

  • CONNECTION_PROJECT_ID:包含运行 Spark 过程的连接的项目。
  • CONNECTION_REGION:包含运行 Spark 过程的连接的区域,例如 us
  • CONNECTION_ID:连接 ID,例如 myconnection

    当您在 Google Cloud 控制台中查看连接详情时,连接 ID 是连接 ID 中显示的完全限定连接 ID 的最后一部分中的值,例如 projects/myproject/locations/connection_location/connections/myconnection

  • RUNTIME_VERSION:Spark 的运行时版本,例如 1.1
  • MAIN_PYTHON_FILE_URI:PySpark 文件的路径,例如 gs://mybucket/mypysparkmain.py

    或者,如果要在 CREATE PROCEDURE 语句中添加存储过程的主体,请在 LANGUAGE PYTHON AS 后面添加 PYSPARK_CODE,如本文中使用内嵌代码中的示例所示。

  • PYSPARK_CODECREATE PROCEDURE 语句中的 PySpark 应用的定义(如果要以内嵌方式传递存储过程的主体)。

    该值为字符串字面量。如果代码包含英文引号和反斜杠,则必须对它们进行转义或表示为原始字符串。例如,代码 return "\n"; 可以表示为以下之一:

    • 带英文引号的字符串:"return \"\\n\";"。同时对英文引号和反斜杠进行转义。
    • 带英文三引号的字符串:"""return "\\n";"""。对反斜杠进行转义,但不对英文引号进行转义。
    • 原始字符串:r"""return "\n";"""。无需进行转义。
    如需了解如何添加内嵌 PySpark 代码,请参阅使用内嵌代码
  • CONTAINER_IMAGEArtifacts Registry 中的映像路径。它只能包含要在过程中使用的库。如果未指定,则系统会使用与运行时版本关联的系统默认容器映像。

如需详细了解如何使用 Spark 构建自定义容器映像,请参阅构建自定义容器映像

调用 Spark 存储过程

创建存储过程后,您可以使用以下方式之一来调用它:

控制台

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 探索器窗格中,展开您的项目,然后选择要运行的 Spark 的存储过程。

  3. 存储过程信息窗口中,点击调用存储过程。或者,您可以展开查看操作选项,然后点击调用

  4. 点击运行

  5. 所有结果部分中,点击查看结果

  6. 可选:在查询结果部分中,执行以下步骤:

    • 如果要查看 Spark 驱动程序日志,请点击执行详细信息

    • 如果要在 Cloud Logging 中查看日志,请点击作业信息,然后在日志字段中,点击日志

    • 如果要获取 Spark History Server 端点,请点击作业信息,然后点击 Spark History Server

SQL

如需调用存储过程,请使用 CALL PROCEDURE 语句

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,输入以下语句:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. 点击 运行

如需详细了解如何运行查询,请参阅运行交互式查询

使用自定义服务账号

您可以使用自定义服务账号在 Spark 代码中访问数据,而不是使用 Spark 连接的服务身份进行数据访问。

如需使用自定义服务账号,请在创建 Spark 存储过程时指定 INVOKER 安全模式(使用 EXTERNAL SECURITY INVOKER 语句),并在调用存储过程时指定服务账号。

如果要从 Cloud Storage 访问和使用 Spark 代码,您需要向 Spark 连接的服务身份授予必要的权限。您需要向连接的服务账号授予 storage.objects.get IAM 权限或 storage.objectViewer IAM 角色。

(可选)如果您在连接中指定了 Dataproc Metastore 和 Dataproc Persistent History Server,则可以向连接的服务账号授予对它们的访问权限。如需了解详情,请参阅向服务账号授予访问权限

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

(可选)您可以将以下参数添加到上述代码中:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

替换以下内容:

  • CUSTOM_SERVICE_ACCOUNT:必填。您提供的自定义服务账号。
  • BUCKET_NAME:可选。用作默认 Spark 应用文件系统的 Cloud Storage 存储桶。如果未提供,系统会在项目中创建默认 Cloud Storage 存储桶,在同一项目下运行的所有作业会共享该存储桶。
  • DATASET:可选。用于存储通过调用相应过程生成的临时数据的数据集。数据在作业完成后会进行清理。如果未提供,则系统会为作业创建默认临时数据集。

您的自定义服务账号必须拥有以下权限:

  • 如需对用作默认 Spark 应用文件系统的暂存存储桶进行读取和写入:

    • 针对您指定的暂存存储桶的 storage.objects.* 权限或 roles/storage.objectAdmin IAM 角色。
    • 或者,针对项目的 storage.buckets.* 权限或 roles/storage.Admin IAM 角色(如果未指定暂存存储桶)。
  • (可选)如需对 BigQuery 读取和写入数据:

    • 针对 BigQuery 表的 bigquery.tables.* 权限。
    • 针对您的项目的 bigquery.readsessions.* 权限。
    • roles/bigquery.admin IAM 角色包含上述权限。
  • (可选)如需对 Cloud Storage 读取和写入数据:

    • 针对 Cloud Storage 对象的 storage.objects.* 权限或 roles/storage.objectAdmin IAM 角色。
  • (可选)如需对用于 INOUT/OUT 参数的暂存数据集进行读取和写入:

    • 针对您指定的暂存数据集的 bigquery.tables.*roles/bigquery.dataEditor IAM 角色。
    • 或者,针对项目的 bigquery.datasets.create 权限或 roles/bigquery.dataEditor IAM 角色(如果未指定暂存数据集)。

Spark 存储过程的示例

本部分介绍如何为 Apache Spark 创建存储过程。

使用 Cloud Storage 中的 PySpark 或 JAR 文件

以下示例展示了如何使用 my-project-id.us.my-connection 连接以及存储在 Cloud Storage 存储桶中的 PySpark 或 JAR 文件为 Spark 创建存储过程:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java 或 Scala

使用 main_file_uri 创建存储过程:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

使用 main_class 创建存储过程:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

使用内嵌代码

以下示例展示了如何使用 my-project-id.us.my-connection 连接和内嵌 PySpark 代码为 Spark 创建存储过程:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

将值作为输入参数传递

以下示例展示了在 Python 中将值作为输入参数传递的两种方法:

方法 1:使用环境变量

在 PySpark 代码中,您可以通过 Spark 驱动程序和执行程序中的环境变量获取 Spark 的存储过程的输入参数。环境变量的名称采用 BIGQUERY_PROC_PARAM.PARAMETER_NAME 格式,其中 PARAMETER_NAME 是输入参数的名称。例如,如果输入参数的名称为 var,则相应环境变量的名称为 BIGQUERY_PROC_PARAM.var。输入参数采用 JSON 编码。在 PySpark 代码中,您可以从环境变量中的 JSON 字符串获取输入参数值,并将其解码为 Python 变量。

以下示例展示了如何将 INT64 类型的输入参数的值获取到 PySpark 代码中:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

方法 2:使用内置库

在 PySpark 代码中,您只需导入内置库并使用它来填充所有类型的参数。要将参数传递给执行程序,请将 Spark 驱动程序中的参数作为 Python 变量填充,并将值传递给执行程序。内置库支持大多数 BigQuery 数据类型,但 INTERVALGEOGRAPHYNUMERICBIGNUMERIC 除外。

BigQuery 数据类型 Python 数据类型
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC 不支持
BIGNUMERIC 不支持
INTERVAL 不支持
GEOGRAPHY 不支持

以下示例展示了如何导入内置库并使用它将 INT64 类型的输入参数和 ARRAY<STRUCT<a INT64, b STRING>> 类型的输入参数填充到 PySpark 代码中:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

在 Java 或 Scala 代码中,您可以通过 Spark 驱动程序和执行程序中的环境变量获取 Spark 的存储过程的输入参数。环境变量的名称采用 BIGQUERY_PROC_PARAM.PARAMETER_NAME 格式,其中 PARAMETER_NAME 是输入参数的名称。例如,如果输入参数的名称为 var,则相应环境变量的名称为 BIGQUERY_PROC_PARAM.var。在 Java 或 Scala 代码中,您可以从环境变量获取输入参数值。

以下示例展示了如何将输入环境变量的值从环境变量获取到 Scala 代码中:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

以下示例展示了如何将环境变量中的输入参数获取到 Java 代码中:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

将值作为 OUTINOUT 参数传递

输出参数从 Spark 过程返回值,而 INOUT 参数接受该过程的值并从过程返回值。如需使用 OUTINOUT 参数,请在创建 Spark 过程时在参数名称之前添加 OUTINOUT 关键字。在 PySpark 代码中,您可以使用内置库将值作为 OUTINOUT 参数返回值。与输入参数相同,内置库支持大多数 BigQuery 数据类型,但 INTERVALGEOGRAPHYNUMERICBIGNUMERIC 除外。TIMEDATETIME 类型值作为 OUTINOUT 参数返回时将转换为 UTC 时区。

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

从 Hive Metastore 表读取并将结果写入 BigQuery

以下示例展示了如何转换 Hive Metastore 表并将结果写入 BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

查看日志过滤条件

调用 Spark 的存储过程后,您可以查看日志信息。如需获取 Cloud Logging 过滤条件信息和 Spark History Cluster 端点,请使用 bq show 命令。过滤条件信息可在子作业的 SparkStatistics 字段下获得。如需获取日志过滤条件,请按以下步骤操作:

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,列出存储过程的脚本作业的子作业:

    bq ls -j --parent_job_id=$parent_job_id

    如需了解如何获取作业 ID,请参阅查看作业详情

    输出类似于以下内容:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. 确定存储过程的 jobId,并使用 bq show 命令来查看作业的详细信息:

    bq show --format=prettyjson --job $child_job_id

    复制 sparkStatistics 字段,因为您需要在另一个步骤中用到它。

    输出类似于以下内容:

    {
    "configuration": {...}"statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. 对于 Logging,请使用 SparkStatistics 字段生成日志过滤条件

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    日志会写入 bigquery.googleapis.com/SparkJob 受监控的资源。日志由 INFODRIVEREXECUTOR 组件标记。如需从 Spark 驱动程序过滤日志,请将 labels.component = "DRIVER" 组件添加到日志过滤条件。如需从 Spark 执行程序过滤日志,请将 labels.component = "EXECUTOR" 组件添加到日志过滤条件。

使用客户管理的加密密钥

BigQuery Spark 过程使用客户管理的加密密钥 (CMEK) 以及 BigQuery 提供的默认加密来保护您的内容。如需在 Spark 过程中使用 CMEK,请先触发 BigQuery 加密服务账号的创建并授予所需权限。Spark 过程还支持 CMEK 组织政策(如果这些政策适用于您的项目)。

如果存储过程使用 INVOKER 安全模式,则您应在调用过程时通过 SQL 系统变量指定 CMEK。否则,您可以通过与存储过程关联的连接指定 CMEK。

如需在创建 Spark 存储过程时通过连接指定 CMEK,请使用以下示例代码:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

如需在调用过程时通过 SQL 系统变量指定 CMEK,请使用以下示例代码:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

使用 VPC Service Controls

借助 VPC Service Controls,您可以设置安全边界以防范数据渗漏。如需将 VPC Service Controls 与 Spark 过程搭配使用以提高安全性,请先创建服务边界

如需全面保护 Spark 过程作业,请将以下 API 添加到服务边界:

  • BigQuery API (bigquery.googleapis.com)
  • Cloud Logging API (logging.googleapis.com)
  • Cloud Storage API (storage.googleapis.com)(如果您使用 Cloud Storage)
  • Artifact Registry API (artifactregistry.googleapis.com) 或 Container Registry API (containerregistry.googleapis.com)(如果您使用自定义容器)
  • Dataproc Metastore API (metastore.googleapis.com) 和 Cloud Run Admin API (run.googleapis.com)(如果您使用 Dataproc Metastore)

将 Spark 过程的查询项目添加到边界中。将托管 Spark 代码或数据的其他项目添加到边界中。

最佳实践

  • 首次在项目中使用连接时,预配大约需要一分钟时间。为了节省时间,您可以在创建 Spark 存储过程时重复使用现有 Spark 连接。

  • 创建用于生产用途的 Spark 过程时,Google 建议您指定运行时版本。如需查看受支持的运行时版本列表,请参阅 Dataproc Serverless 运行时版本。我们建议使用长期支持 (LTS) 版本。

  • 在 Spark 过程中指定自定义容器时,我们建议您使用 Artifact Registry 和映像流式传输

  • 为了提高性能,您可以在 Spark 过程中指定资源分配属性。Spark 存储过程支持与 Dataproc Serverless 相同的资源分配属性列表。

限制

  • 您只能使用 gRPC 端点协议连接到 Dataproc Metastore。其他类型的 Hive Metastore 尚不受支持。
  • 客户管理的加密密钥 (CMEK) 仅在客户创建单区域 Spark 过程时可用。不支持全球区域 CMEK 密钥和多区域 CMEK 密钥,例如 EUUS
  • 只有 PySpark 才支持传递输出参数。
  • 如果通过跨区域数据集复制将 Spark 存储过程关联的数据集复制到目标区域,则只能在其创建区域查询存储过程。

配额和限制

如需了解配额和限制,请参阅 Spark 存储过程的配额和限制

后续步骤