Trabaja con procedimientos almacenados para Apache Spark

Este documento está dirigido a ingenieros de datos, científicos de datos y analistas de datos para crear y llamar a procedimientos almacenados para Spark en BigQuery.

Con BigQuery, puedes crear y ejecutar los procedimientos almacenados de Spark que están escritos en Python, Java y Scala. Luego, puedes ejecutar estos procedimientos almacenados en BigQuery con una consulta de GoogleSQL, de manera similar a la ejecución de procedimientos almacenados en SQL.

Antes de comenzar

Para crear un procedimiento almacenado para Spark, pídele a tu administrador que cree una conexión de Spark y la comparta contigo. El administrador también debe otorgar a la cuenta de servicio asociada con la conexión los permisos necesarios de Identity and Access Management (IAM).

Roles obligatorios

Para obtener los permisos que necesitas para hacer las tareas de este documento, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Estos roles predefinidos contienen los permisos necesarios para hacer las tareas de este documento. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para hacer las tareas de este documento:

  • Crea una conexión:
    • bigquery.connections.create
    • bigquery.connections.list
  • Crea un procedimiento almacenado para Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Llama a un procedimiento almacenado para Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Consideración de la ubicación

Debes crear un procedimiento almacenado para Spark en la misma ubicación que tu conexión, ya que el procedimiento almacenado se ejecuta en la misma ubicación que la conexión. Por ejemplo, para crear un procedimiento almacenado en la multirregión de EE.UU., usa una conexión ubicada en la multirregión de EE.UU.

Precios

  • Los cargos por ejecutar procedimientos de Spark en BigQuery son similares a los cargos por ejecutar procedimientos de Spark en Dataproc Serverless. Para obtener más información, consulta Precios de Dataproc Serverless.

  • Los procedimientos almacenados de Spark se pueden usar con el modelo de precios a pedido y con cualquiera de las ediciones de BigQuery. Los procedimientos de Spark se cobran mediante el modelo de pago por uso de la edición Enterprise de BigQuery en todos los casos, sin importar el modelo de precios de procesamiento que se usa en tu proyecto.

  • Los procedimientos almacenados de Spark para BigQuery no admiten el uso de reservas ni compromisos. Las reservas y los compromisos existentes se siguen usando para otras consultas y procedimientos compatibles. Los cargos por el uso de los procedimientos almacenados de Spark se agregarán a tu factura según el costo de pago por uso de la edición Enterprise. Se aplican los descuentos de tu organización, cuando corresponda.

  • Si bien los procedimientos almacenados de Spark usan un motor de ejecución de Spark, no verás cargos separados por la ejecución de Spark. Como se indicó, los cargos correspondientes se informan como un SKU de pago por uso de la edición Enterprise de BigQuery.

  • Los procedimientos almacenados de Spark no ofrecen un nivel gratuito.

Crea un procedimiento almacenado para Spark

Debes crear el procedimiento almacenado en la misma ubicación de la conexión que usas.

Si el cuerpo del procedimiento almacenado es de más de 1 MB, te recomendamos que coloques el procedimiento almacenado en un archivo en un bucket de Cloud Storage en lugar de usar código intercalado. BigQuery proporciona dos métodos para crear un procedimiento almacenado para Spark con Python:

Usa el Editor de consultas en SQL

Para crear un procedimiento almacenado para Spark en el editor de consultas en SQL, sigue estos pasos:

  1. Ve a la página de BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, agrega el código de muestra para la declaración CREATE PROCEDURE que aparece.

    Como alternativa, en el panel Explorador, haz clic en la conexión del proyecto que usaste para crear el recurso de conexión. Luego, para crear un procedimiento almacenado para Spark, haz clic en Crear procedimiento almacenado.

    Python

    Si quieres crear un procedimiento almacenado para Spark en Python, usa el siguiente código de muestra:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java o Scala

    Para crear un procedimiento almacenado para Spark en Java o Scala con la opción main_file_uri, usa el siguiente código de muestra:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Para crear un procedimiento almacenado para Spark en Java o Scala con las opciones main_class y jar_uris, usa el siguiente código de muestra:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Reemplaza lo siguiente:

    • PROJECT_ID: el proyecto en el que deseas crear el procedimiento almacenado, por ejemplo, myproject.
    • DATASET: el conjunto de datos en el que deseas crear el procedimiento almacenado, por ejemplo, mydataset
    • PROCEDURE_NAME: el nombre del procedimiento almacenado que deseas ejecutar en BigQuery, por ejemplo, mysparkprocedure.
    • PROCEDURE_ARGUMENT: un parámetro para escribir los argumentos de entrada.

      En este parámetro, especifica los siguientes campos:

      • ARGUMENT_MODE: el modo del argumento.

        Los valores válidos incluyen IN, OUT y INOUT. De forma predeterminada, el valor es IN.

      • ARGUMENT_NAME: el nombre del argumento.
      • ARGUMENT_TYPE: el tipo de argumento.

      Por ejemplo: myproject.mydataset.mysparkproc(num INT64).

      Para obtener más información, consulta cómo pasar un valor como un parámetro IN o los parámetros OUT y INOUT en este documento.

    • CONNECTION_PROJECT_ID: el proyecto que contiene la conexión para ejecutar el procedimiento de Spark.
    • CONNECTION_REGION: la región que contiene la conexión para ejecutar el procedimiento de Spark, por ejemplo, us.
    • CONNECTION_ID: el ID de conexión, por ejemplo, myconnection.

      Cuando ves los detalles de conexión en la consola de Google Cloud, el ID de conexión es el valor en la última sección del ID de conexión completamente calificado que se muestra en ID de conexión, por ejemplo projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: la versión del entorno de ejecución de Spark, por ejemplo, 1.1.
    • MAIN_PYTHON_FILE_URI: la ruta a un archivo de PySpark, por ejemplo gs://mybucket/mypysparkmain.py.

      Como alternativa, si deseas agregar el cuerpo del procedimiento almacenado en la declaración CREATE PROCEDURE, agrega PYSPARK_CODE después de LANGUAGE PYTHON AS como se muestra en el ejemplo de Usa código intercalado en este documento.

    • PYSPARK_CODE: la definición de una aplicación de PySpark en la declaración CREATE PROCEDURE si deseas pasar el cuerpo del procedimiento intercalado.

      El valor es un literal de string. Si el código incluye comillas y barras invertidas, se deben escapar o representar como una cadena sin procesar. Por ejemplo, el código "\n"; se puede representar como una de las siguientes opciones:

      • String entrecomillada: "return \"\\n\";" Se evitan las comillas y las barras invertidas.
      • String entre comillas triples: """return "\\n";""". Las barras invertidas se escapan, mientras que las comillas no.
      • String sin procesar: r"""return "\n";""". No se necesita escape.
      Si deseas obtener información para agregar código PySpark intercalado, consulta Usa código intercalado.
    • MAIN_JAR_URI: la ruta del archivo JAR que contiene la clase main, por ejemplo, gs://mybucket/my_main.jar.
    • CLASS_NAME: el nombre completamente calificado de una clase en un conjunto de JAR con la opción jar_uris, por ejemplo, com.example.wordcount.
    • URI: la ruta del archivo JAR que contiene la clase especificada en la clase main, por ejemplo, gs://mybucket/mypysparkmain.jar.

    Para ver las opciones adicionales que puedes especificar en OPTIONS, consulta la lista de opciones de procedimiento.

Usa el editor de PySpark

Cuando creas un procedimiento a través del editor de PySpark, no necesitas usar la declaración CREATE PROCEDURE. En su lugar, agrega tu código de Python directamente en el editor de Pyspark y guarda o ejecuta tu código.

Para crear un procedimiento almacenado para Spark en el editor de PySpark, sigue estos pasos:

  1. Ve a la página de BigQuery.

    Ir a BigQuery

  2. Si deseas escribir el código de PySpark directamente, abre el editor de PySpark. Para abrir el editor de PySpark, haz clic en el menú junto a Crear consulta en SQL y, luego, selecciona Crear procedimiento de PySpark.

  3. Para configurar las opciones, haz clic en Más > Opciones de PySpark y, luego, haz lo siguiente:

    1. Especifica la ubicación en la que deseas ejecutar el código de PySpark.

    2. En el campo Conexión, especifica la conexión a Spark.

    3. En la sección Invocación de procedimiento almacenado, especifica el conjunto de datos en el que deseas almacenar los procedimientos almacenados temporales que se generan. Puedes configurar un conjunto de datos específico o permitir que un conjunto de datos temporal invoque el código de PySpark.

      El conjunto de datos temporal se genera con la ubicación especificada en el paso anterior. Si se especifica un nombre de conjunto de datos, asegúrate de que el conjunto de datos y la conexión de Spark estén en la misma ubicación.

    4. En la sección Parámetros, define los parámetros para el procedimiento almacenado. El valor del parámetro solo se usa durante las ejecuciones en sesión del código de PySpark, pero la declaración se almacena en el procedimiento.

    5. En la sección Opciones avanzadas, especifica las opciones del procedimiento. Para obtener una lista detallada de las opciones de procedimiento, consulta la lista de opciones de procedimiento.

    6. En la sección Propiedades, agrega los pares clave-valor para configurar el trabajo. Puedes usar cualquiera de los pares clave-valor de las propiedades de Spark de Dataproc Serverless.

    7. En Configuración de la cuenta de servicio, especifica la cuenta de servicio personalizada, la CMEK, el conjunto de datos de etapa de pruebas y la carpeta de etapa de pruebas de Cloud Storage que se usará durante las ejecuciones en sesión del código de PySpark.

    8. Haz clic en Guardar.

Guarda un procedimiento almacenado para Spark

Después de crear el procedimiento almacenado a través del editor de PySpark, puedes guardar el procedimiento almacenado. Para hacerlo, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página de BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, crea un procedimiento almacenado para Spark con Python con el editor de PySpark.

  3. Haz clic en Guardar > Guardar procedimiento.

  4. En el cuadro de diálogo Guardar procedimiento almacenado, especifica el nombre del conjunto de datos en el que deseas almacenar el procedimiento almacenado y el nombre del procedimiento almacenado.

  5. Haz clic en Guardar.

    Si solo deseas ejecutar el código de PySpark en lugar de guardarlo como un procedimiento almacenado, puedes hacer clic en Ejecutar en lugar de Guardar.

Usar contenedores personalizados

El contenedor personalizado proporciona el entorno de ejecución para los procesos del controlador y del ejecutor de la carga de trabajo. Para usar contenedores personalizados, usa el siguiente código de muestra:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Reemplaza lo siguiente:

  • PROJECT_ID: el proyecto en el que deseas crear el procedimiento almacenado, por ejemplo, myproject.
  • DATASET: el conjunto de datos en el que deseas crear el procedimiento almacenado, por ejemplo, mydataset.
  • PROCEDURE_NAME: el nombre del procedimiento almacenado que deseas ejecutar en BigQuery, por ejemplo, mysparkprocedure.
  • PROCEDURE_ARGUMENT: un parámetro para escribir los argumentos de entrada.

    En este parámetro, especifica los siguientes campos:

    • ARGUMENT_MODE: el modo del argumento.

      Los valores válidos incluyen IN, OUT y INOUT. De forma predeterminada, el valor es IN.

    • ARGUMENT_NAME: el nombre del argumento.
    • ARGUMENT_TYPE: el tipo de argumento.

    Por ejemplo: myproject.mydataset.mysparkproc(num INT64).

    Para obtener más información, consulta cómo pasar un valor como un parámetro IN o los parámetros OUT y INOUT en este documento.

  • CONNECTION_PROJECT_ID: el proyecto que contiene la conexión para ejecutar el procedimiento de Spark.
  • CONNECTION_REGION: la región que contiene la conexión para ejecutar el procedimiento de Spark, por ejemplo, us.
  • CONNECTION_ID: el ID de conexión, por ejemplo, myconnection.

    Cuando ves los detalles de conexión en la consola de Google Cloud, el ID de conexión es el valor en la última sección del ID de conexión por completo calificado que se muestra en ID de conexión, por ejemplo projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: la versión del entorno de ejecución de Spark, por ejemplo, 1.1.
  • MAIN_PYTHON_FILE_URI: la ruta a un archivo de PySpark, por ejemplo gs://mybucket/mypysparkmain.py.

    Como alternativa, si deseas agregar el cuerpo del procedimiento almacenado en la declaración CREATE PROCEDURE, agrega PYSPARK_CODE después de LANGUAGE PYTHON AS como se muestra en el ejemplo de Usa código intercalado en este documento.

  • PYSPARK_CODE: la definición de una aplicación de PySpark en la declaración CREATE PROCEDURE si deseas pasar el cuerpo del procedimiento intercalado.

    El valor es un literal de string. Si el código incluye comillas y barras invertidas, se deben escapar o representar como una cadena sin procesar. Por ejemplo, el código "\n"; se puede representar como una de las siguientes opciones:

    • String entrecomillada: "return \"\\n\";" Se evitan las comillas y las barras invertidas.
    • String entre comillas triples: """return "\\n";""". Las barras invertidas se escapan, mientras que las comillas no.
    • String sin procesar: r"""return "\n";""". No se necesita escape.
    Si deseas obtener información para agregar código PySpark intercalado, consulta Usa código intercalado.
  • CONTAINER_IMAGE: Es la ruta de acceso de la imagen en el registro de artefactos. Solo debe contener bibliotecas para usar en tu procedimiento. Si no se especifica, se usa la imagen de contenedor predeterminada del sistema asociada con la versión del entorno de ejecución.

Para obtener más información sobre cómo compilar una imagen de contenedor personalizada con Spark, consulta Compila una imagen de contenedor personalizada.

Llama a un procedimiento almacenado para Spark

Después de crear un procedimiento almacenado, puedes llamarlo con una de las siguientes opciones:

Console

  1. Ve a la página de BigQuery.

    Ir a BigQuery

  2. En el panel Explorador, expande tu proyecto y elige el procedimiento almacenado para Spark que deseas ejecutar.

  3. En la ventana Información del procedimiento almacenado, haz clic en Invocar procedimiento almacenado. De manera alternativa, puedes expandir la opción Ver acciones y hacer clic enInvocar.

  4. Haz clic en Ejecutar.

  5. En la sección Todos los resultados, haz clic en Ver resultados.

  6. Opcional: En la sección Resultados de la consulta, sigue estos pasos:

    • Si deseas ver los registros del controlador Spark, haz clic en Detalles de ejecución.

    • Si deseas ver los registros en Cloud Logging, haz clic en Información del trabajo y, luego, en el campo Registro, haz clic enregistro.

    • Si deseas obtener el extremo del servidor de historial de Spark, haz clic en Información del trabajo y, luego, en Servidor de historial de Spark.

SQL

Para llamar a un procedimiento almacenado, usa la declaración CALL PROCEDURE:

  1. En la consola de Google Cloud, ve a la página de BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, escribe la siguiente sentencia:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. Haz clic en Ejecutar.

Si deseas obtener información sobre cómo ejecutar consultas, visita Ejecuta una consulta interactiva.

Usar una cuenta de servicio personalizada

En lugar de usar la identidad del servicio de la conexión de Spark para el acceso a los datos, puedes usar una cuenta de servicio personalizada a fin de acceder a los datos dentro de tu código de Spark.

Para usar una cuenta de servicio personalizada, especifica el modo de seguridad INVOKER (con la declaración EXTERNAL SECURITY INVOKER) cuando crees un procedimiento almacenado de Spark, y especifica la cuenta de servicio cuando invoques el procedimiento almacenado.

Si deseas acceder al código de Spark y usarlo desde Cloud Storage, debes otorgar los permisos necesarios a la identificación del servicio de la conexión de Spark. Debes otorgar a la cuenta de servicio de la conexión el permiso de IAM storage.objects.get o el rol de IAM storage.objectViewer.

De manera opcional, puedes otorgar a la cuenta de servicio de la conexión acceso a Dataproc Metastore y al servidor del historial persistente de Dataproc si los especificaste en la conexión. Para obtener más información, consulta Otorga acceso a la cuenta de servicio.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

De manera opcional, puedes agregar los siguientes argumentos al código anterior:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Reemplaza lo siguiente:

  • CUSTOM_SERVICE_ACCOUNT: Obligatorio. Una cuenta de servicio personalizada que proporciones.
  • BUCKET_NAME: Opcional El bucket de Cloud Storage que se usa como el sistema de archivos predeterminado de la aplicación Spark. Si no se proporciona, se crea un bucket de Cloud Storage predeterminado en tu proyecto y el bucket se comparte entre todos los trabajos que se ejecutan en el mismo proyecto.
  • DATASET: Opcional El conjunto de datos para almacenar los datos temporales producidos por la invocación el procedimiento. Los datos se limpian después de que se completa el trabajo. Si no se proporciona, se crea un conjunto de datos temporal predeterminado para el trabajo.

Tu cuenta de servicio personalizada debe tener los siguientes permisos:

  • Para leer y escribir en el bucket de etapa de pruebas usado como sistema de archivos predeterminado de la aplicación de Spark, haz lo siguiente:

    • Los permisos storage.objects.* o el rol de IAM roles/storage.objectAdmin en el bucket de etapa de pruebas que especifiques.
    • Además, los permisos storage.buckets.* o el rol de IAM roles/storage.Admin en el proyecto si no se especifica el bucket de etapa de pruebas
  • (Opcional) Para leer y escribir datos desde y hacia BigQuery, haz lo siguiente:

    • bigquery.tables.* en tus tablas de BigQuery.
    • bigquery.readsessions.* en tu proyecto
    • El rol de IAM roles/bigquery.admin incluye los permisos anteriores.
  • (Opcional) Para leer y escribir datos desde y hacia Cloud Storage:

    • Los permisos storage.objects.* o el rol de IAM roles/storage.objectAdmin en tus objetos de Cloud Storage.
  • (Opcional) Para leer y escribir en el conjunto de datos de etapa de pruebas que se usa para los parámetros INOUT/OUT, haz lo siguiente:

    • Rol de IAM bigquery.tables.* o roles/bigquery.dataEditor en el conjunto de datos de etapa de pruebas que especifiques.
    • Además, el permiso bigquery.datasets.create o el rol de IAM roles/bigquery.dataEditor en el proyecto si no se especifica el conjunto de datos de etapa de pruebas.

Ejemplos de procedimientos almacenados para Spark

En esta sección, se muestran ejemplos sobre cómo crear un procedimiento almacenado para Apache Spark.

Usa un archivo PySpark o JAR en Cloud Storage

En el siguiente ejemplo, se muestra cómo crear un procedimiento almacenado para Spark a través de la conexión my-project-id.us.my-connection y un archivo PySpark o JAR que se almacena en un bucket de Cloud Storage:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java o Scala

Usa main_file_uri para crear un procedimiento almacenado:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Usa main_class para crear un procedimiento almacenado:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Usa código intercalado

En el siguiente ejemplo, se muestra cómo crear un procedimiento almacenado para Spark a través de la conexión my-project-id.us.my-connection y el código de PySpark intercalado:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Pasa un valor como un parámetro de entrada

En los siguientes ejemplos, se muestran los dos métodos para pasar un valor como un parámetro de entrada en Python:

Método 1: Usa variables de entorno

En el código de PySpark, puedes obtener los parámetros de entrada del procedimiento almacenado para Spark a través de variables de entorno en el controlador y los ejecutores de Spark. El nombre de la variable de entorno tiene el formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, en el que PARAMETER_NAME es el nombre del parámetro de entrada. Por ejemplo, si el nombre del parámetro de entrada var, el nombre de la variable de entorno correspondiente es BIGQUERY_PROC_PARAM.var. Los parámetros de entrada están codificados en JSON. En tu código de PySpark, puedes obtener el valor del parámetro de entrada en una cadena JSON de la variable de entorno y decodificarlo a una variable de Python.

En el siguiente ejemplo, se muestra cómo obtener el valor de un parámetro de entrada del tipo INT64 en el código de PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Método 2: Usa una biblioteca integrada

En el código de PySpark, simplemente puedes importar una biblioteca integrada y usarla para propagar todos los tipos de parámetros. Para pasar los parámetros a los ejecutores, propaga los parámetros en un controlador de Spark como variables de Python y pasa los valores a los ejecutores. La biblioteca integrada admite la mayoría de los tipos de datos de BigQuery, excepto INTERVAL, GEOGRAPHY, NUMERIC y BIGNUMERIC.

Tipo de datos de BigQuery Tipo de datos de Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC No compatible
BIGNUMERIC No compatible
INTERVAL No compatible
GEOGRAPHY No compatible

En el siguiente ejemplo, se muestra cómo importar la biblioteca integrada y usarla para propagar un parámetro de entrada del tipo INT64 y un parámetro de entrada del tipo ARRAY<STRUCT<a INT64, b STRING>> a tu código de PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

En el código de Java o Scala, puedes obtener los parámetros de entrada del procedimiento almacenado para Spark a través de variables de entorno en el controlador y los ejecutores de Spark. El nombre de la variable de entorno tiene el formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, en el que PARAMETER_NAME es el nombre del parámetro de entrada. Por ejemplo, si el nombre del parámetro de entrada es var, el nombre de la variable de entorno correspondiente es BIGQUERY_PROC_PARAM.var. En tu código de Java o Scala, puedes obtener el valor del parámetro de entrada de la variable de entorno.

En el siguiente ejemplo, se muestra cómo obtener el valor de un parámetro de entrada de las variables de entorno en el código de Scala:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

En el siguiente ejemplo, se muestra cómo obtener parámetros de entrada de las variables de entorno en el código de Java:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Pasa valores como los parámetros OUT y INOUT

Los parámetros de salida muestran el valor del procedimiento de Spark, mientras que el parámetro INOUT acepta un valor para el procedimiento y muestra un valor del procedimiento. Para usar los parámetros OUT y INOUT, agrega las palabras clave OUT o INOUT antes del nombre del parámetro cuando crees el procedimiento de Spark. En el código de PySpark, usa la biblioteca integrada para mostrar un valor como un parámetro OUT o INOUT. Al igual que los parámetros de entrada, la biblioteca integrada admite la mayoría de los tipos de datos de BigQuery, excepto INTERVAL, GEOGRAPHY, NUMERIC y BIGNUMERIC. Los valores de tipo TIME y DATETIME se convierten en la zona horaria UTC cuando se muestran como los parámetros OUT o INOUT.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Lee desde una tabla de Hive Metastore y escribir resultados en BigQuery

En el siguiente ejemplo, se muestra cómo transformar una tabla de Hive Metastore y escribir los resultados en BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Visualiza filtros de registros

Después de llamar a un procedimiento almacenado para Spark, puedes ver la información de registro. Para obtener la información del filtro de Cloud Logging y el extremo del clúster del historial de Spark, usa el comando bq show. La información del filtro está disponible en el campo SparkStatistics del trabajo secundario. Para obtener filtros de registro, sigue estos pasos:

  1. Ve a la página de BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, enumera los trabajos secundarios del trabajo de secuencia de comandos del procedimiento almacenado:

    bq ls -j --parent_job_id=$parent_job_id

    Para aprender a obtener el ID del trabajo, consulta Visualiza los detalles del trabajo.

    El resultado es similar a este:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. Identifica el jobId para tu procedimiento almacenado y usa el comando bq show con el objetivo de ver los detalles del trabajo:

    bq show --format=prettyjson --job $child_job_id

    Copia el campo sparkStatistics porque lo necesitarás en otro paso.

    El resultado es similar a este:

    {
    "configuration": {...}
    …
    "statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. Para Logging, genera filtros de registro con los campos SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    Los registros se escriben en el recurso supervisado bigquery.googleapis.com/SparkJob. Los componentes INFO, DRIVER y EXECUTOR etiquetan los registros. Para filtrar registros del controlador de Spark, agrega el componente labels.component = "DRIVER" a los filtros de registro. Para filtrar registros del ejecutor de Spark, agrega el componente labels.component = "EXECUTOR" a los filtros de registro.

Usa la clave de encriptación administrada por el cliente

El procedimiento de Spark de BigQuery usa la clave de encriptación administrada por el cliente (CMEK) para proteger tu contenido, junto con la encriptación predeterminada que proporciona BigQuery. Para usar la CMEK en el procedimiento de Spark, primero activa la creación de la cuenta de servicio de encriptación de BigQuery y otorga los permisos necesarios. El procedimiento de Spark también admite las políticas de la organización de CMEK si se aplican a tu proyecto.

Si tu procedimiento almacenado usa el modo de seguridad INVOKER, las CMEK deben especificarse a través de la variable de sistema SQL cuando se llama al procedimiento. De lo contrario, las CMEK se pueden especificar a través de la conexión asociada con el procedimiento almacenado.

Para especificar las CMEK a través de la conexión cuando creas un procedimiento almacenado de Spark, usa el siguiente código de muestra:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Para especificar CMEK a través de la variable del sistema de SQL cuando llamas al procedimiento, usa el siguiente código de muestra:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Usa los Controles del servicio de VPC

Los Controles del servicio de VPC te permiten configurar un perímetro seguro para protegerte contra el robo de datos. Si quieres usar los Controles del servicio de VPC con un procedimiento de Spark para obtener seguridad adicional, primero crea un perímetro de servicio.

Para proteger por completo tus trabajos de procedimiento de Spark, agrega las siguientes API al perímetro de servicio:

  • API de BigQuery (bigquery.googleapis.com)
  • API de Cloud Logging (logging.googleapis.com)
  • La API de Cloud Storage (storage.googleapis.com), si usas Cloud Storage
  • La API de Artifact Registry (artifactregistry.googleapis.com) o la API de Container Registry (containerregistry.googleapis.com), si usas un contenedor personalizado
  • La API de Dataproc Metastore (metastore.googleapis.com) y la API de Cloud Run Admin (run.googleapis.com), si usas Dataproc Metastore

Agrega el proyecto de consulta del procedimiento de Spark en el perímetro. Agrega otros proyectos que alojen el código o los datos de Spark en el perímetro.

Prácticas recomendadas

  • Cuando usas una conexión en tu proyecto por primera vez, el aprovisionamiento tarda alrededor de un minuto. Para ahorrar tiempo, puedes volver a usar una conexión existente de Spark cuando crees un procedimiento almacenado para Spark.

  • Cuando creas un procedimiento de Spark para su uso en producción, Google recomienda especificar una versión del entorno de ejecución. Para obtener una lista de las versiones compatibles del entorno de ejecución, consulta Versiones del entorno de ejecución de Dataproc Serverless. Recomendamos usar la versión de asistencia a largo plazo (LTS).

  • Cuando especificas un contenedor personalizado en un procedimiento de Spark, recomendamos usar Artifact Registry y la transmisión de imágenes.

  • Para obtener un mejor rendimiento, puedes especificar propiedades de asignación de recursos en el procedimiento de Spark. Los procedimientos almacenados de Spark admiten una lista de propiedades de asignación de recursos igual que Dataproc Serverless.

Limitaciones

Cuotas y límites

Si quieres obtener información sobre las cuotas y los límites, consulta Procedimientos almacenados para cuotas y límites de Spark.

¿Qué sigue?