Trabajar con procedimientos almacenados de Apache Spark

Este documento está dirigido a ingenieros, científicos y analistas de datos que quieran crear y llamar a procedimientos almacenados para Spark en BigQuery.

Con BigQuery, puedes crear y ejecutar Spark escritos en Python, Java y Scala. Después, puedes ejecutar estos procedimientos almacenados en BigQuery con una consulta de GoogleSQL, de forma similar a cuando se ejecutan los procedimientos almacenados en SQL.

Antes de empezar

Para crear un procedimiento almacenado para Spark, pide a tu administrador que cree una conexión de Spark y la comparta contigo. Además, tu administrador debe conceder a la cuenta de servicio asociada a la conexión los permisos de gestión de identidades y accesos (IAM) necesarios.

Roles obligatorios

Para obtener los permisos que necesitas para llevar a cabo las tareas de este documento, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

Estos roles predefinidos contienen los permisos necesarios para realizar las tareas de este documento. Para ver los permisos exactos que se necesitan, despliega la sección Permisos necesarios:

Permisos obligatorios

Para realizar las tareas de este documento, se necesitan los siguientes permisos:

  • Crea una conexión:
    • bigquery.connections.create
    • bigquery.connections.list
  • Crea un procedimiento almacenado para Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Llama a un procedimiento almacenado de Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

También puedes obtener estos permisos con roles personalizados u otros roles predefinidos.

Consideraciones sobre la ubicación

Debes crear un procedimiento almacenado para Spark en la misma ubicación que tu conexión porque el procedimiento almacenado se ejecuta en la misma ubicación que la conexión. Por ejemplo, para crear un procedimiento almacenado en la multirregión de EE. UU., debes usar una conexión ubicada en la multirregión de EE. UU.

Precios

  • Los cargos por ejecutar procedimientos de Spark en BigQuery son similares a los cargos por ejecutar procedimientos de Spark en Serverless para Apache Spark. Para obtener más información, consulta los precios de Serverless para Apache Spark.

  • Los procedimientos almacenados de Spark se pueden usar con el modelo de precios bajo demanda, así como con cualquiera de las ediciones de BigQuery. Los procedimientos de Spark se cobran con el modelo de pago por uso de la edición Enterprise de BigQuery en todos los casos, independientemente del modelo de precios de computación que se utilice en tu proyecto.

  • Los procedimientos almacenados de Spark para BigQuery no admiten el uso de reservas ni compromisos. Las reservas y los compromisos se siguen usando para otras consultas y procedimientos admitidos. Los cargos por el uso de procedimientos almacenados de Spark se añaden a tu factura con el coste de la edición Enterprise con el modelo de pago por uso. Se aplican los descuentos de tu organización, si procede.

  • Aunque los procedimientos almacenados de Spark usan un motor de ejecución de Spark, no verás cargos independientes por la ejecución de Spark. Como se ha indicado, los cargos correspondientes se registran como SKU de pago por uso de la edición Enterprise de BigQuery.

  • Los procedimientos almacenados de Spark no ofrecen un nivel gratuito.

Crear un procedimiento almacenado para Spark

Debes crear el procedimiento almacenado en la misma ubicación que la conexión que utilices.

Si el cuerpo de tu procedimiento almacenado ocupa más de 1 MB, te recomendamos que lo coloques en un archivo de un segmento de Cloud Storage en lugar de usar código insertado. BigQuery ofrece dos métodos para crear un procedimiento almacenado para Spark con Python:

Usar el editor de consultas de SQL

Para crear un procedimiento almacenado para Spark en el editor de consultas SQL, sigue estos pasos:

  1. Ve a la página BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, añade el código de ejemplo de la CREATE PROCEDURE sentencia que aparece.

    También puedes hacer clic en la conexión del proyecto en el panel Explorador que has usado para crear el recurso de conexión. A continuación, para crear un procedimiento almacenado para Spark, haz clic en Crear procedimiento almacenado.

    Python

    Para crear un procedimiento almacenado para Spark en Python, usa el siguiente código de muestra:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java o Scala

    Para crear un procedimiento almacenado para Spark en Java o Scala con la opción main_file_uri, usa el siguiente código de ejemplo:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Para crear un procedimiento almacenado para Spark en Java o Scala con las opciones main_class y jar_uris, usa el siguiente código de ejemplo:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Haz los cambios siguientes:

    • PROJECT_ID: el proyecto en el que quieres crear el procedimiento almacenado. Por ejemplo, myproject.
    • DATASET: el conjunto de datos en el que quieres crear el procedimiento almacenado. Por ejemplo, mydataset.
    • PROCEDURE_NAME: el nombre del procedimiento almacenado que quieres ejecutar en BigQuery. Por ejemplo, mysparkprocedure.
    • PROCEDURE_ARGUMENT: un parámetro para introducir los argumentos de entrada.

      En este parámetro, especifica los siguientes campos:

      • ARGUMENT_MODE: el modo del argumento.

        Los valores válidos son IN, OUT y INOUT. De forma predeterminada, el valor es IN.

      • ARGUMENT_NAME: el nombre del argumento.
      • ARGUMENT_TYPE: el tipo del argumento.

      Por ejemplo: myproject.mydataset.mysparkproc(num INT64).

      Para obtener más información, consulta cómo pasar un valor como parámetro IN o los parámetros OUT y INOUT de este documento.

    • CONNECTION_PROJECT_ID: el proyecto que contiene la conexión para ejecutar el procedimiento de Spark.
    • CONNECTION_REGION: la región que contiene la conexión para ejecutar el procedimiento de Spark (por ejemplo, us).
    • CONNECTION_ID: el ID de conexión, por ejemplo, myconnection.

      Cuando consultas los detalles de la conexión en la consola de Google Cloud , el ID de conexión es el valor de la última sección del ID de conexión completo que se muestra en ID de conexión. Por ejemplo, projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: la versión de tiempo de ejecución de Spark (por ejemplo, 2.2).
    • MAIN_PYTHON_FILE_URI: la ruta a un archivo PySpark. Por ejemplo, gs://mybucket/mypysparkmain.py.

      También puede añadir el cuerpo del procedimiento almacenado en la instrucción CREATE PROCEDURE. Para ello, añada PYSPARK_CODE después de LANGUAGE PYTHON AS, como se muestra en el ejemplo de la sección Usar código insertado de este documento.

    • PYSPARK_CODE: la definición de una aplicación PySpark en la instrucción CREATE PROCEDURE si quieres transferir el cuerpo del procedimiento de forma insertada.

      El valor es un literal de cadena. Si el código incluye comillas y barras invertidas, deben incluirse con caracteres de escape o representarse como una cadena sin formato. Por ejemplo, el código de retorno "\n"; se puede representar de una de las siguientes formas:

      • Cadena entrecomillada: "return \"\\n\";". Las comillas y las barras invertidas tienen caracteres de escape.
      • Cadena entre comillas triples: """return "\\n";""". Las barras invertidas se escapan, pero las comillas no.
      • Cadena sin procesar: r"""return "\n";""". No es necesario usar secuencias de escape.
      Para saber cómo añadir código PySpark insertado, consulta Usar código insertado.
    • MAIN_JAR_URI: la ruta del archivo JAR que contiene la clase main. Por ejemplo, gs://mybucket/my_main.jar.
    • CLASS_NAME: el nombre completo de una clase de un conjunto JAR con la opción jar_uris. Por ejemplo, com.example.wordcount.
    • URI: la ruta del archivo JAR que contiene la clase especificada en main class, por ejemplo, gs://mybucket/mypysparkmain.jar.

    Para ver otras opciones que puedes especificar en OPTIONS, consulta la lista de opciones de procedimiento.

Usar el editor de PySpark

Cuando creas un procedimiento con el editor de PySpark, no es necesario que uses la instrucción CREATE PROCEDURE. En su lugar, añade el código Python directamente en el editor de PySpark y guarda o ejecuta el código.

Para crear un procedimiento almacenado para Spark en el editor de PySpark, sigue estos pasos:

  1. Ve a la página BigQuery.

    Ir a BigQuery

  2. Si quieres escribir el código PySpark directamente, abre el editor de PySpark. Para abrir el editor de PySpark, haz clic en el menú situado junto a Crear consulta SQL y, a continuación, selecciona Crear procedimiento de PySpark.

  3. Para definir opciones, haz clic en Más > Opciones de PySpark y, a continuación, haz lo siguiente:

    1. Especifica la ubicación en la que quieres ejecutar el código PySpark.

    2. En el campo Conexión, especifica la conexión de Spark.

    3. En la sección Invocación de procedimiento almacenado, especifica el conjunto de datos en el que quieras almacenar los procedimientos almacenados temporales que se generen. Puedes definir un conjunto de datos específico o permitir el uso de un conjunto de datos temporal para invocar el código de PySpark.

      El conjunto de datos temporal se genera con la ubicación especificada en el paso anterior. Si se especifica un nombre de conjunto de datos, asegúrate de que el conjunto de datos y la conexión de Spark estén en la misma ubicación.

    4. En la sección Parámetros, defina los parámetros del procedimiento almacenado. El valor del parámetro solo se usa durante las ejecuciones en la sesión del código de PySpark, pero la declaración en sí se almacena en el procedimiento.

    5. En la sección Opciones avanzadas, especifica las opciones del procedimiento. Para ver una lista detallada de las opciones de procedimiento, consulta la lista de opciones de procedimiento.

    1. En la sección Propiedades, añada los pares clave-valor para configurar el trabajo. Puedes usar cualquiera de los pares clave-valor de las propiedades de Spark admitidas en Serverless para Apache Spark.

    1. En Configuración de la cuenta de servicio, especifica la cuenta de servicio personalizada, la CMEK, el conjunto de datos de almacenamiento temporal y la carpeta de Cloud Storage de almacenamiento temporal que se usarán durante las ejecuciones en sesión del código PySpark.

    2. Haz clic en Guardar.

Guardar un procedimiento almacenado para Spark

Después de crear el procedimiento almacenado con el editor de PySpark, puedes guardarlo. Para ello, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, crea un procedimiento almacenado para Spark con Python en el editor de PySpark.

  3. Haz clic en Guardar > Guardar procedimiento.

  4. En el cuadro de diálogo Guardar procedimiento almacenado, especifica el nombre del conjunto de datos en el que quieres almacenar el procedimiento almacenado y el nombre del procedimiento almacenado.

  5. Haz clic en Guardar.

    Si solo quieres ejecutar el código de PySpark en lugar de guardarlo como un procedimiento almacenado, puedes hacer clic en Ejecutar en lugar de Guardar.

Usar contenedores personalizados

El contenedor personalizado proporciona el entorno de ejecución para los procesos del controlador y del ejecutor de la carga de trabajo. Para usar contenedores personalizados, usa el siguiente código de muestra:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Haz los cambios siguientes:

  • PROJECT_ID: el proyecto en el que quieres crear el procedimiento almacenado. Por ejemplo, myproject.
  • DATASET: el conjunto de datos en el que quieres crear el procedimiento almacenado. Por ejemplo, mydataset.
  • PROCEDURE_NAME: el nombre del procedimiento almacenado que quieres ejecutar en BigQuery. Por ejemplo, mysparkprocedure.
  • PROCEDURE_ARGUMENT: parámetro para introducir los argumentos de entrada.

    En este parámetro, especifica los siguientes campos:

    • ARGUMENT_MODE: el modo del argumento.

      Los valores válidos son IN, OUT y INOUT. De forma predeterminada, el valor es IN.

    • ARGUMENT_NAME: el nombre del argumento.
    • ARGUMENT_TYPE: el tipo del argumento.

    Por ejemplo: myproject.mydataset.mysparkproc(num INT64).

    Para obtener más información, consulta cómo pasar un valor como parámetro IN o los parámetros OUT y INOUT de este documento.

  • CONNECTION_PROJECT_ID: el proyecto que contiene la conexión para ejecutar el procedimiento de Spark.
  • CONNECTION_REGION: la región que contiene la conexión para ejecutar el procedimiento de Spark (por ejemplo, us).
  • CONNECTION_ID: el ID de la conexión, por ejemplo, myconnection.

    Cuando consultas los detalles de la conexión en la consola de Google Cloud , el ID de conexión es el valor de la última sección del ID de conexión completo que se muestra en ID de conexión. Por ejemplo, projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: la versión de tiempo de ejecución de Spark (por ejemplo, 2.2).
  • MAIN_PYTHON_FILE_URI: la ruta a un archivo PySpark. Por ejemplo, gs://mybucket/mypysparkmain.py.

    También puede añadir el cuerpo del procedimiento almacenado en la instrucción CREATE PROCEDURE. Para ello, añada PYSPARK_CODE después de LANGUAGE PYTHON AS, como se muestra en el ejemplo de la sección Usar código insertado de este documento.

  • PYSPARK_CODE: la definición de una aplicación PySpark en la instrucción CREATE PROCEDURE si quieres transferir el cuerpo del procedimiento de forma insertada.

    El valor es un literal de cadena. Si el código incluye comillas y barras invertidas, deben incluirse con caracteres de escape o representarse como una cadena sin formato. Por ejemplo, el código de retorno "\n"; se puede representar de una de las siguientes formas:

    • Cadena entrecomillada: "return \"\\n\";". Se aplican caracteres de escape a las comillas y a las barras invertidas.
    • Cadena entre comillas triples: """return "\\n";""". Las barras invertidas se escapan, pero las comillas no.
    • Cadena sin procesar: r"""return "\n";""". No es necesario usar secuencias de escape.
    Para saber cómo añadir código PySpark en línea, consulta Usar código en línea.
  • CONTAINER_IMAGE: ruta de la imagen en el registro de artefactos. Solo debe contener bibliotecas que se puedan usar en tu procedimiento. Si no se especifica, se usa la imagen de contenedor predeterminada del sistema asociada a la versión del tiempo de ejecución.

Para obtener más información sobre cómo crear una imagen de contenedor personalizada con Spark, consulta Crear una imagen de contenedor personalizada.

Llamar a un procedimiento almacenado de Spark

Después de crear un procedimiento almacenado, puedes llamarlo mediante una de las siguientes opciones:

Consola

  1. Ve a la página BigQuery.

    Ir a BigQuery

  2. En el panel Explorador, expande tu proyecto y selecciona el procedimiento almacenado de Spark que quieras ejecutar.

  3. En la ventana Información del procedimiento almacenado, haga clic en Invocar el procedimiento almacenado. También puedes desplegar la opción Ver acciones y hacer clic en Invocar.

  4. Haz clic en Ejecutar.

  5. En la sección Todos los resultados, haz clic en Ver resultados.

  6. Opcional: En la sección Resultados de la consulta, siga estos pasos:

    • Si quieres ver los registros del controlador de Spark, haz clic en Detalles de la ejecución.

    • Si quieres ver los registros en Cloud Logging, haz clic en Información del trabajo y, a continuación, en el campo Registro, haz clic en registro.

    • Si quieres obtener el endpoint del servidor de historial de Spark, haz clic en Información del trabajo y, a continuación, en Servidor de historial de Spark.

SQL

Para llamar a un procedimiento almacenado, usa la CALL PROCEDURE instrucción:

  1. En la Google Cloud consola, ve a la página BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, introduce la siguiente instrucción:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. Haz clic en Ejecutar.

Para obtener más información sobre cómo ejecutar consultas, consulta Ejecutar una consulta interactiva.

Usar una cuenta de servicio personalizada

En lugar de usar la identidad de servicio de la conexión de Spark para acceder a los datos, puedes usar una cuenta de servicio personalizada para acceder a los datos de tu código de Spark.

Para usar una cuenta de servicio personalizada, especifica el modo de seguridad INVOKER (con la instrucción EXTERNAL SECURITY INVOKER) al crear un procedimiento almacenado de Spark y especifica la cuenta de servicio al invocar el procedimiento almacenado.

Cuando ejecutes el procedimiento almacenado de Spark con la cuenta de servicio personalizada por primera vez, BigQuery creará un agente de servicio de Spark y le concederá los permisos necesarios. Asegúrate de no modificar esta concesión antes de invocar el procedimiento almacenado de Spark. Para obtener más información, consulta el artículo Agente de servicio de BigQuery Spark.

Si quieres acceder y usar código de Spark desde Cloud Storage, debes conceder los permisos necesarios a la identidad de servicio de la conexión de Spark. Debes conceder a la cuenta de servicio de la conexión el permiso storage.objects.get de gestión de identidades y accesos o el rol storage.objectViewer de gestión de identidades y accesos.

Si lo has especificado en la conexión, también puedes conceder acceso a la cuenta de servicio de la conexión a Dataproc Metastore y a Dataproc Persistent History Server. Para obtener más información, consulta Conceder acceso a la cuenta de servicio.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

También puede añadir los siguientes argumentos al código anterior:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Haz los cambios siguientes:

  • CUSTOM_SERVICE_ACCOUNT: obligatorio. Una cuenta de servicio personalizada que proporciones.
  • BUCKET_NAME: opcional. El segmento de Cloud Storage que se usa como sistema de archivos predeterminado de la aplicación Spark. Si no se proporciona, se creará un segmento de Cloud Storage predeterminado en el proyecto y todos los trabajos que se ejecuten en el mismo proyecto compartirán el segmento.
  • DATASET: opcional. El conjunto de datos en el que se almacenarán los datos temporales que se generen al invocar el procedimiento. Los datos se limpian una vez que se completa el trabajo. Si no se proporciona, se crea un conjunto de datos temporal predeterminado para el trabajo.

Tu cuenta de servicio personalizada debe tener los siguientes permisos:

  • Para leer y escribir en el bucket de almacenamiento provisional que se usa como sistema de archivos predeterminado de la aplicación Spark, haz lo siguiente:

    • Permisos storage.objects.* o el rol de gestión de identidades y accesos roles/storage.objectAdmin en el segmento de almacenamiento provisional que especifiques.
    • Además, los permisos de storage.buckets.* o el rol de gestión de identidades y accesos roles/storage.Admin en el proyecto si no se especifica el segmento de almacenamiento provisional.
  • (Opcional) Para leer y escribir datos en BigQuery:

    • bigquery.tables.* en tus tablas de BigQuery.
    • bigquery.readsessions.* en tu proyecto.
    • El rol de gestión de identidades y accesos roles/bigquery.admin incluye los permisos anteriores.
  • Opcional: Para leer y escribir datos en Cloud Storage, haz lo siguiente:

    • storage.objects.* o el rol de gestión de identidades y accesos roles/storage.objectAdmin en tus objetos de Cloud Storage.
  • (Opcional) Para leer y escribir en el conjunto de datos de staging que se usa para los parámetros INOUT/OUT:

    • Rol de gestión de identidades y accesos bigquery.tables.* o roles/bigquery.dataEditor en el conjunto de datos de almacenamiento provisional que especifiques.
    • Además, el permiso bigquery.datasets.create o el rol de gestión de identidades y accesos roles/bigquery.dataEditor en el proyecto si no se especifica el conjunto de datos de almacenamiento provisional.

Ejemplos de procedimientos almacenados para Spark

En esta sección se muestran ejemplos de cómo puedes crear un procedimiento almacenado para Apache Spark.

Usar un archivo PySpark o JAR en Cloud Storage

En el siguiente ejemplo se muestra cómo crear un procedimiento almacenado para Spark mediante la conexión my-project-id.us.my-connection y un archivo PySpark o JAR almacenado en un segmento de Cloud Storage:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java o Scala

Usa main_file_uri para crear un procedimiento almacenado:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Usa main_class para crear un procedimiento almacenado:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Usar código insertado

En el siguiente ejemplo se muestra cómo crear un procedimiento almacenado para Spark mediante la conexión my-project-id.us.my-connection y el código PySpark insertado:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Enviar un valor como parámetro de entrada

En los siguientes ejemplos se muestran las dos formas de transferir un valor como parámetro de entrada en Python:

Método 1: Usar variables de entorno

En el código de PySpark, puedes obtener los parámetros de entrada del procedimiento almacenado para Spark a través de variables de entorno en el controlador y los ejecutores de Spark. El nombre de la variable de entorno tiene el formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, donde PARAMETER_NAME es el nombre del parámetro de entrada. Por ejemplo, si el nombre del parámetro de entrada es var, el nombre de la variable de entorno correspondiente es BIGQUERY_PROC_PARAM.var. Los parámetros de entrada están codificados en JSON. En tu código de PySpark, puedes obtener el valor del parámetro de entrada en una cadena JSON de la variable de entorno y decodificarlo en una variable de Python.

En el siguiente ejemplo se muestra cómo obtener el valor de un parámetro de entrada de tipo INT64 en tu código de PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Método 2: Usar una biblioteca integrada

En el código de PySpark, puedes importar una biblioteca integrada y usarla para rellenar todos los tipos de parámetros. Para transferir los parámetros a los ejecutores, rellene los parámetros en un controlador de Spark como variables de Python y transfiera los valores a los ejecutores. La biblioteca integrada admite la mayoría de los tipos de datos de BigQuery, excepto INTERVAL, GEOGRAPHY, NUMERIC y BIGNUMERIC.

Tipo de datos de BigQuery Tipo de datos de Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC No admitido
BIGNUMERIC No compatible
INTERVAL No compatible
GEOGRAPHY No admitido

En el siguiente ejemplo se muestra cómo importar la biblioteca integrada y usarla para rellenar un parámetro de entrada de tipo INT64 y un parámetro de entrada de tipo ARRAY<STRUCT<a INT64, b STRING>> en tu código de PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

En el código Java o Scala, puedes obtener los parámetros de entrada del procedimiento almacenado para Spark a través de las variables de entorno en el controlador y los ejecutores de Spark. El nombre de la variable de entorno tiene el formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, donde PARAMETER_NAME es el nombre del parámetro de entrada. Por ejemplo, si el nombre del parámetro de entrada es var, el nombre de la variable de entorno correspondiente es BIGQUERY_PROC_PARAM.var. En tu código Java o Scala, puedes obtener el valor del parámetro de entrada de la variable de entorno.

En el siguiente ejemplo se muestra cómo obtener el valor de un parámetro de entrada de las variables de entorno en el código de Scala:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

En el siguiente ejemplo se muestra cómo obtener parámetros de entrada de variables de entorno en tu código Java:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Enviar valores como parámetros OUT y INOUT

Los parámetros de salida devuelven el valor del procedimiento de Spark, mientras que el parámetro INOUT acepta un valor para el procedimiento y devuelve un valor del procedimiento. Para usar los parámetros OUT y INOUT, añada la palabra clave OUT o INOUT antes del nombre del parámetro al crear el procedimiento de Spark. En el código de PySpark, se usa la biblioteca integrada para devolver un valor como parámetro OUT o INOUT. Al igual que los parámetros de entrada, la biblioteca integrada admite la mayoría de los tipos de datos de BigQuery, excepto INTERVAL, GEOGRAPHY, NUMERIC y BIGNUMERIC. Los valores de tipo TIME y DATETIME se convierten a la zona horaria UTC cuando se devuelven como los parámetros OUT o INOUT.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Leer de una tabla de Hive Metastore y escribir los resultados en BigQuery

En el siguiente ejemplo se muestra cómo transformar una tabla de Hive Metastore y escribir los resultados en BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="2.2")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Ver filtros de registro

Después de llamar a un procedimiento almacenado de Spark, puedes ver la información del registro. Para obtener la información del filtro de Cloud Logging y el endpoint del clúster del historial de Spark, usa el comando bq show. La información del filtro está disponible en el campo SparkStatistics de la tarea secundaria. Para obtener filtros de registro, sigue estos pasos:

  1. Ve a la página BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, enumera los trabajos secundarios del trabajo de secuencias de comandos del procedimiento almacenado:

    bq ls -j --parent_job_id=$parent_job_id

    Para saber cómo obtener el ID de trabajo, consulta Ver los detalles de un trabajo.

    El resultado debería ser similar al siguiente:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. Identifica el jobId de tu procedimiento almacenado y usa el comando bq show para ver los detalles de la tarea:

    bq show --format=prettyjson --job $child_job_id

    Copia el campo sparkStatistics, ya que lo necesitarás en otro paso.

    El resultado debería ser similar al siguiente:

    {
    "configuration": {...}"statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. En Logging, genera filtros de registro con los campos SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    Los registros se escriben en el bigquery.googleapis.com/SparkJob recurso monitorizado. Los registros se etiquetan con los componentes INFO, DRIVER y EXECUTOR. Para filtrar los registros del controlador de Spark, añade el componente labels.component = "DRIVER" a los filtros de registro. Para filtrar los registros del ejecutor de Spark, añade el componente labels.component = "EXECUTOR" a los filtros de registro.

Usar la clave de cifrado gestionada por el cliente

El procedimiento de BigQuery Spark usa la clave de cifrado gestionada por el cliente (CMEK) para proteger tu contenido, junto con el cifrado predeterminado que proporciona BigQuery. Para usar la CMEK en el procedimiento de Spark, primero debes activar la creación de la cuenta de servicio de cifrado de BigQuery y conceder los permisos necesarios. El procedimiento de Spark también admite las políticas de organización de CMEK si se aplican a tu proyecto.

Si tu procedimiento almacenado usa el modo de seguridad INVOKER, tu CMEK debe especificarse mediante la variable del sistema SQL al llamar al procedimiento. De lo contrario, tu CMEK se puede especificar a través de la conexión asociada al procedimiento almacenado.

Para especificar la CMEK a través de la conexión al crear un procedimiento almacenado de Spark, usa el siguiente código de ejemplo:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Para especificar la CMEK a través de la variable de sistema SQL al llamar al procedimiento, usa el siguiente código de ejemplo:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Usar Controles de Servicio de VPC

Controles de Servicio de VPC te permite configurar un perímetro seguro para protegerte frente a la filtración externa de datos. Para usar Controles de Servicio de VPC con un procedimiento de Spark y aumentar la seguridad, primero crea un perímetro de servicio.

Para proteger completamente los trabajos de procedimientos de Spark, añade las siguientes APIs al perímetro de servicio:

  • API de BigQuery (bigquery.googleapis.com)
  • API de Cloud Logging (logging.googleapis.com)
  • API de Cloud Storage (storage.googleapis.com), si usas Cloud Storage
  • API de Artifact Registry (artifactregistry.googleapis.com) o API de Container Registry (containerregistry.googleapis.com), si usas un contenedor personalizado
  • API de Dataproc Metastore (metastore.googleapis.com) y API Admin de Cloud Run (run.googleapis.com), si usas Dataproc Metastore

Añade el proyecto de consulta del procedimiento de Spark al perímetro. Añade al perímetro otros proyectos que alojen tu código o datos de Spark.

Prácticas recomendadas

  • Cuando usas una conexión en tu proyecto por primera vez, tarda aproximadamente un minuto más en aprovisionarse. Para ahorrar tiempo, puedes reutilizar una conexión de Spark que ya tengas cuando crees un procedimiento almacenado para Spark.

  • Cuando creas un procedimiento de Spark para usarlo en producción, Google recomienda especificar una versión de tiempo de ejecución. Para ver una lista de las versiones del entorno de ejecución admitidas, consulta Versiones del entorno de ejecución de Serverless para Apache Spark. Te recomendamos que utilices la versión de asistencia a largo plazo (LTS).

  • Cuando especifiques un contenedor personalizado en un procedimiento de Spark, te recomendamos que utilices Artifact Registry y streaming de imágenes.

  • Para mejorar el rendimiento, puede especificar propiedades de asignación de recursos en el procedimiento de Spark. Los procedimientos almacenados de Spark admiten una lista de propiedades de asignación de recursos igual que Serverless para Apache Spark.

Limitaciones

  • Solo puedes usar el protocolo de endpoint gRPC para conectarte a Dataproc Metastore. Aún no se admiten otros tipos de Hive Metastore.
  • Las claves de cifrado gestionadas por el cliente (CMEK) solo están disponibles cuando los clientes crean procedimientos de Spark de una sola región. No se admiten las claves de CMEK de la región global ni las claves de CMEK multirregionales, como EU o US.
  • Solo se pueden transferir parámetros de salida en PySpark.
  • Si el conjunto de datos asociado al procedimiento almacenado de Spark se replica en una región de destino mediante la replicación de conjuntos de datos entre regiones, el procedimiento almacenado solo se podrá consultar en la región en la que se haya creado.
  • Spark no admite el acceso a endpoints HTTP en tu red privada de Controles de Servicio de VPC.

Cuotas y límites

Para obtener información sobre las cuotas y los límites, consulta Procedimientos almacenados de Spark: cuotas y límites.

Siguientes pasos