Utilizzare le stored procedure per Apache Spark

Questo documento è rivolto a data engineer, data scientist e analisti di dati per creare e chiamare procedure memorizzate per Spark in BigQuery.

Con BigQuery puoi creare ed eseguire Spark stored procedure scritte in Python, Java e Scala. Puoi quindi eseguire queste stored procedure in BigQuery utilizzando una query GoogleSQL, in modo simile all'esecuzione di stored procedure SQL.

Prima di iniziare

Per creare una procedura memorizzata per Spark, chiedi all'amministratore di creare una connessione Spark e di condividerla con te. L'amministratore deve anche concedere all'account di servizio associato alla connessione le autorizzazioni Identity and Access Management (IAM) richieste.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per eseguire le attività descritte in questo documento, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Questi ruoli predefiniti contengono le autorizzazioni necessarie per eseguire le attività descritte in questo documento. Per visualizzare le autorizzazioni esatte richieste, espandi la sezione Autorizzazioni richieste:

Autorizzazioni obbligatorie

Per eseguire le attività descritte in questo documento sono necessarie le seguenti autorizzazioni:

  • Crea una connessione:
    • bigquery.connections.create
    • bigquery.connections.list
  • Crea una stored procedure per Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Chiama una stored procedure per Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Considerazione della posizione

Devi creare una procedura memorizzata per Spark nella stessa posizione della connessione perché la procedura memorizzata viene eseguita nella stessa posizione della connessione. Ad esempio, per creare una procedura memorizzata nella multiregione degli Stati Uniti, devi utilizzare una connessione situata nella multiregione degli Stati Uniti.

Prezzi

  • Gli addebiti per l'esecuzione di procedure Spark su BigQuery sono simili a quelli per l'esecuzione di procedure Spark su Dataproc Serverless. Per ulteriori informazioni, consulta i prezzi di Dataproc Serverless.

  • Le stored procedure Spark possono essere utilizzate con il modello di determinazione dei prezzi on demand e con qualsiasi versione di BigQuery. Gli addebiti per le procedure Spark vengono effettuati utilizzando il modello di pagamento a consumo di BigQuery Enterprise in tutti i casi, indipendentemente dal modello di prezzi di calcolo utilizzato nel progetto.

  • Le stored procedure Spark per BigQuery non supportano l'utilizzo di prenotazioni o impegni. Le prenotazioni e gli impegni esistenti continuano a essere utilizzati per altre query e procedure supportate. Gli addebiti per l'utilizzo delle procedure memorizzate di Spark vengono aggiunti alla fattura per la versione Enterprise al costo a consumo. Gli sconti della tua organizzazione vengono applicati, se applicabili.

  • Sebbene le stored procedure di Spark utilizzino un motore di esecuzione Spark, non vedrai addebiti separati per l'esecuzione di Spark. Come indicato, gli addebiti corrispondenti vengono registrati come SKU pay-as-you-go di BigQuery Enterprise.

  • Le stored procedure Spark non offrono un livello gratuito.

Creare una stored procedure per Spark

Devi creare la procedura memorizzata nella stessa posizione della connessione che utilizzi.

Se il corpo della procedura memorizzata è superiore a 1 MB, ti consigliamo di inserirla in un file in un bucket Cloud Storage anziché utilizzare il codice in linea. BigQuery fornisce due metodi per creare una stored procedure per Spark utilizzando Python:

Utilizzare l'editor di query SQL

Per creare una stored procedure per Spark nell'editor query SQL:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor delle query, aggiungi il codice campione per l'istruzioneCREATE PROCEDURE visualizzata.

    In alternativa, nel riquadro Explorer, fai clic sulla connessione nel progetto che hai utilizzato per creare la risorsa di connessione. Per creare una procedura memorizzata per Spark, fai clic su Crea procedura memorizzata.

    Python

    Per creare stored procedure per Spark in Python, utilizza il seguente codice campione:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java o Scala

    Per creare una procedura memorizzata per Spark in Java o Scala con l'opzione main_file_uri, utilizza il seguente codice campione:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Per creare una procedura memorizzata per Spark in Java o Scala con le opzioni main_class e jar_uris, utilizza il seguente codice campione:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Sostituisci quanto segue:

    • PROJECT_ID: il progetto in cui vuoi creare la procedura memorizzata, ad esempio myproject.
    • DATASET: il set di dati in cui vuoi creare la procedura memorizzata, ad esempio mydataset.
    • PROCEDURE_NAME: il nome della procedura memorizzata che vuoi eseguire in BigQuery, ad esempio mysparkprocedure.
    • PROCEDURE_ARGUMENT: un parametro per inserire gli argomenti di input.

      In questo parametro, specifica i seguenti campi:

      • ARGUMENT_MODE: la modalità dell'argomento.

        I valori validi sono IN, OUT e INOUT. Per impostazione predefinita, il valore è IN.

      • ARGUMENT_NAME: il nome dell'argomento.
      • ARGUMENT_TYPE: il tipo di argomento.

      Ad esempio: myproject.mydataset.mysparkproc(num INT64).

      Per ulteriori informazioni, consulta la sezione Passare un valore come parametro IN o i parametri OUT e INOUT in questo documento.

    • CONNECTION_PROJECT_ID: il progetto che contiene la connessione per eseguire la procedura Spark.
    • CONNECTION_REGION: la regione che contiene la connessione per eseguire la procedura Spark, ad esempio us.
    • CONNECTION_ID: l'ID connessione, ad esempio myconnection.

      Quando visualizzi i dettagli della connessione nella console Google Cloud , l'ID connessione è il valore nell'ultima sezione dell'ID connessione completo visualizzato in ID connessione, ad esempio projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: la versione del runtime di Spark, ad esempio 1.1.
    • MAIN_PYTHON_FILE_URI: il percorso di un file PySpark, ad esempio gs://mybucket/mypysparkmain.py.

      In alternativa, se vuoi aggiungere il corpo della procedura memorizzata nell'istruzione CREATE PROCEDURE, aggiungi PYSPARK_CODE dopo LANGUAGE PYTHON AS come mostrato nell'esempio in Utilizzare il codice in linea in questo documento.

    • PYSPARK_CODE: la definizione di un'applicazione PySpark nell'istruzione CREATE PROCEDURE se vuoi passare il corpo della procedura in linea.

      Il valore è un valore letterale di stringa. Se il codice include virgolette e barre rovesciate, queste devono essere sottoposte a esegui l'escape o rappresentate come stringa non elaborata. Ad esempio, il codice restituito "\n"; può essere rappresentato come uno dei seguenti:

      • Stringa tra virgolette: "return \"\\n\";". Sia le virgolette sia le barre di sbarramento sono precedute da un carattere di escape.
      • Stringa tra tripli apici: """return "\\n";""". Le barre verticali vengono evase, mentre le virgolette no.
      • Stringa non elaborata: r"""return "\n";""". Non è necessario eseguire escape.
      Per scoprire come aggiungere codice PySpark in linea, consulta Utilizzare il codice in linea.
    • MAIN_JAR_URI: il percorso del file JAR contenente la classe main, ad esempio gs://mybucket/my_main.jar.
    • CLASS_NAME: il nome completo di una classe in un file JAR impostato con l'opzione jar_uris, ad esempio com.example.wordcount.
    • URI: il percorso del file JAR contenente la classe specificata nella classe main, ad esempio gs://mybucket/mypysparkmain.jar.

    Per altre opzioni che puoi specificare in OPTIONS, consulta l'elenco delle opzioni della procedura.

Utilizzare l'editor PySpark

Quando crei una procedura utilizzando l'editor PySpark, non devi utilizzare l'istruzioneCREATE PROCEDURE. Aggiungi invece il codice Python direttamente nell'editor Pyspark e salvalo o eseguilo.

Per creare una stored procedure per Spark nell'editor PySpark, segui questi passaggi:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Se vuoi digitare direttamente il codice PySpark, apri l'editor PySpark. Per aprire l'editor PySpark, fai clic sul menu accanto a Crea query SQL e seleziona Crea procedura PySpark.

  3. Per impostare le opzioni, fai clic su Altro > Opzioni PySpark e poi segui questi passaggi:

    1. Specifica la posizione in cui vuoi eseguire il codice PySpark.

    2. Nel campo Connection (Connessione), specifica la connessione Spark.

    3. Nella sezione Chiamata stored procedure, specifica il set di dati in cui vuoi archiviare le stored procedure temporanee generate. Puoi impostare un set di dati specifico o consentire l'utilizzo di un set di dati temporaneo per richiamare il codice PySpark.

      Il set di dati temporaneo viene generato con la posizione specificata nel passaggio precedente. Se è specificato un nome del set di dati, assicurati che il set di dati e la connessione Spark debbano trovarsi nella stessa posizione.

    4. Nella sezione Parametri, definisci i parametri per la procedura memorizzata. Il valore del parametro viene utilizzato solo durante le esecuzioni in-session del codice PySpark, ma la dichiarazione stessa viene memorizzata nella procedura.

    5. Nella sezione Opzioni avanzate, specifica le opzioni della procedura. Per un elenco dettagliato delle opzioni della procedura, consulta l'elenco delle opzioni della procedura.

    6. Nella sezione Proprietà, aggiungi le coppie chiave-valore per configurare il job. Puoi utilizzare una delle coppie chiave-valore delle proprietà Spark di Dataproc Serverless.

    7. In Impostazioni account di servizio, specifica l'account di servizio personalizzato, il set di dati di staging, il set di dati di staging e la cartella Cloud Storage di staging da utilizzare durante le esecuzioni in-session del codice PySpark.

    8. Fai clic su Salva.

Salvare una stored procedure per Spark

Dopo aver creato la stored procedure utilizzando l'editor PySpark, puoi salvarla. Per farlo, segui questi passaggi:

  1. Nella console Google Cloud , vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, crea una stored procedure per Spark utilizzando Python con l'editor PySpark.

  3. Fai clic su Salva > Salva procedura.

  4. Nella finestra di dialogo Salva stored procedure, specifica il nome del set di dati in cui vuoi memorizzare la stored procedure e il nome della stored procedure.

  5. Fai clic su Salva.

    Se vuoi solo eseguire il codice PySpark anziché salvarlo come stored procedure, puoi fare clic su Esegui anziché su Salva.

Utilizzo di container personalizzati

Il container personalizzato fornisce l'ambiente di runtime per i processi di esecuzione e del driver del carico di lavoro. Per utilizzare i container personalizzati, utilizza il seguente codice campione:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Sostituisci quanto segue:

  • PROJECT_ID: il progetto in cui vuoi creare la procedura memorizzata, ad esempio myproject.
  • DATASET: il set di dati in cui vuoi creare la procedura memorizzata, ad esempio mydataset.
  • PROCEDURE_NAME: il nome della procedura memorizzata che vuoi eseguire in BigQuery, ad esempio mysparkprocedure.
  • PROCEDURE_ARGUMENT: un parametro per inserire gli argomenti di input.

    In questo parametro, specifica i seguenti campi:

    • ARGUMENT_MODE: la modalità dell'argomento.

      I valori validi sono IN, OUT e INOUT. Per impostazione predefinita, il valore è IN.

    • ARGUMENT_NAME: il nome dell'argomento.
    • ARGUMENT_TYPE: il tipo di argomento.

    Ad esempio: myproject.mydataset.mysparkproc(num INT64).

    Per saperne di più, consulta la sezione Passare un valore come parametro IN o i parametri OUT e INOUT in questo documento.

  • CONNECTION_PROJECT_ID: il progetto contenente la connessione per eseguire la procedura Spark.
  • CONNECTION_REGION: la regione che contiene la connessione per eseguire la procedura Spark, ad esempio us.
  • CONNECTION_ID: l'ID connessione, ad esempio myconnection.

    Quando visualizzi i dettagli della connessione nella console Google Cloud , l'ID connessione è il valore nell'ultima sezione dell'ID connessione visualizzato, ad esempio projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: la versione del runtime di Spark, ad esempio 1.1.
  • MAIN_PYTHON_FILE_URI: il percorso di un file PySpark, ad esempio gs://mybucket/mypysparkmain.py.

    In alternativa, se vuoi aggiungere il corpo della procedura memorizzata nell'istruzione CREATE PROCEDURE, aggiungi PYSPARK_CODE dopo LANGUAGE PYTHON AS come mostrato nell'esempio in Utilizzare il codice in linea in questo documento.

  • PYSPARK_CODE: la definizione di un'applicazione PySpark nell'istruzione CREATE PROCEDURE se vuoi passare il corpo della procedura in linea.

    Il valore è un valore letterale di stringa. Se il codice include virgolette e barre diagonali, queste devono essere inserite in modo appropriato o rappresentate come stringa non elaborata. Ad esempio, il codice restituito "\n"; può essere rappresentato come uno dei seguenti:

    • Stringa tra virgolette: "return \"\\n\";". Sia le virgolette sia le barre di sbarramento sono precedute da un carattere di escape.
    • Stringa tra tripli apici: """return "\\n";""". Le barre oblique vengono evase, mentre le virgolette no.
    • Stringa non elaborata: r"""return "\n";""". Non è necessario eseguire escape.
    Per scoprire come aggiungere codice PySpark in linea, consulta Utilizzare il codice in linea.
  • CONTAINER_IMAGE: percorso dell'immagine in Artifact Registry. Deve contenere solo le librerie da utilizzare nella procedura. Se non specificato, viene utilizzata l'immagine contenitore predefinita del sistema associata alla versione del runtime.

Per saperne di più su come creare un'immagine container personalizzata con Spark, consulta Creare un'immagine container personalizzata.

Chiama una stored procedure per Spark

Dopo aver creato una procedura memorizzata, puoi invocarla utilizzando una delle seguenti opzioni:

Console

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nel riquadro Explorer, espandi il progetto e seleziona la procedura memorizzata per Spark che vuoi eseguire.

  3. Nella finestra Informazioni stored procedure, fai clic su Chiama stored procedure. In alternativa, puoi espandere l'opzione Visualizza azioni e fare clic su Esegui.

  4. Fai clic su Esegui.

  5. Nella sezione Tutti i risultati, fai clic su Visualizza risultati.

  6. (Facoltativo) Nella sezione Risultati delle query:

    • Se vuoi visualizzare i log del driver Spark, fai clic su Dettagli esecuzione.

    • Se vuoi visualizzare i log in Cloud Logging, fai clic su Informazioni sul job e poi fai clic su log nel campo Log.

    • Se vuoi ottenere l'endpoint del server di cronologia Spark, fai clic su Informazioni sul job e poi su Server di cronologia Spark.

SQL

Per chiamare una stored procedure, utilizza l'CALL PROCEDURE istruzione:

  1. Nella console Google Cloud , vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, inserisci la seguente istruzione:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. Fai clic su Esegui.

Per ulteriori informazioni su come eseguire query, consulta Eseguire una query interattiva.

Utilizzo di un account di servizio personalizzato

Anziché utilizzare l'identità di servizio della connessione Spark per accedere ai dati, puoi utilizzare un account di servizio personalizzato per accedere ai dati all'interno del codice Spark.

Per utilizzare un account di servizio personalizzato, specifica la modalità di sicurezza INVOKER (utilizzando l'istruzione EXTERNAL SECURITY INVOKER) quando crei una procedura memorizzata Spark e specifica l'account di servizio quando richiami la procedura memorizzata.

Se vuoi accedere e utilizzare il codice Spark da Cloud Storage, devi concedere le autorizzazioni necessarie all'identità di servizio della connessione Spark. Devi concedere all'account di servizio della connessione l'autorizzazione IAM storage.objects.get o il ruolo IAM storage.objectViewer.

Se vuoi, puoi concedere all'account di servizio della connessione l'accesso a Dataproc Metastore e al server di cronologia permanente Dataproc se li hai specificati nella connessione. Per ulteriori informazioni, consulta Concedere l'accesso all'account di servizio.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Se vuoi, puoi aggiungere i seguenti argomenti al codice precedente:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Sostituisci quanto segue:

  • CUSTOM_SERVICE_ACCOUNT: obbligatorio. Un account di servizio personalizzato fornito da te.
  • BUCKET_NAME: facoltativo. Il bucket Cloud Storage utilizzato come file system predefinito dell'applicazione Spark. Se non viene fornito, nel progetto viene creato un bucket Cloud Storage predefinito che viene condiviso da tutti i job in esecuzione nello stesso progetto.
  • DATASET: facoltativo. Il set di dati in cui archiviare i dati temporanei prodotti dall'invocazione della procedura. I dati vengono ripuliti al termine del job. Se non viene fornito, per il job viene creato un set di dati temporaneo predefinito.

Il tuo account di servizio personalizzato deve avere le seguenti autorizzazioni:

  • Per leggere e scrivere nel bucket di staging utilizzato come file system dell'applicazione Spark predefinito:

    • autorizzazioni storage.objects.* o il ruolo IAM roles/storage.objectAdmin nel bucket di staging specificato.
    • Inoltre, le autorizzazioni storage.buckets.* o il ruolo IAM roles/storage.Admin nel progetto se il bucket di staging non è specificato.
  • (Facoltativo) Per leggere e scrivere dati da e verso BigQuery:

    • bigquery.tables.* nelle tabelle BigQuery.
    • bigquery.readsessions.* nel tuo progetto.
    • Il ruolo IAM roles/bigquery.admin include le autorizzazioni precedenti.
  • (Facoltativo) Per leggere e scrivere dati da e verso Cloud Storage:

    • storage.objects.* o il ruolo IAM roles/storage.objectAdmin per gli oggetti Cloud Storage.
  • (Facoltativo) Per leggere e scrivere nel set di dati di staging utilizzato per i parametri INOUT/OUT:

    • Ruolo IAM bigquery.tables.* o roles/bigquery.dataEditor nel set di dati di staging specificato.
    • Inoltre, l'autorizzazione bigquery.datasets.create o il ruolo IAM roles/bigquery.dataEditor nel progetto se il set di dati di staging non è specificato.

Esempi di stored procedure per Spark

Questa sezione mostra esempi di come creare una stored procedure per Apache Spark.

Utilizzare un file JAR o PySpark in Cloud Storage

L'esempio seguente mostra come creare una stored procedure per Spark utilizzando la connessione my-project-id.us.my-connection e un file PySpark o JAR archiviato in un bucket Cloud Storage:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java o Scala

Usa main_file_uri per creare una stored procedure:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Usa main_class per creare una stored procedure:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Utilizzare il codice in linea

L'esempio seguente mostra come creare una stored procedure per Spark utilizzando la connessione my-project-id.us.my-connection e il codice PySpark in linea:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Passare un valore come parametro di input

Gli esempi seguenti mostrano i due metodi per passare un valore come parametro di input in Python:

Metodo 1: utilizza le variabili di ambiente

Nel codice PySpark, puoi ottenere i parametri di input della stored procedure per Spark tramite le variabili di ambiente nel driver e negli executor di Spark. Il nome della variabile di ambiente ha il formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, dove PARAMETER_NAME è il nome del parametro di input. Ad esempio, se il nome del parametro di input è var, il nome della variabile di ambiente corrispondente è BIGQUERY_PROC_PARAM.var. I parametri di input sono codificati in JSON. Nel codice PySpark, puoi ottenere il valore parametro di input in una stringa JSON dalla variabile di ambiente e decodificarlo in una variabile Python.

L'esempio seguente mostra come ottenere il valore di un parametro di input di tipo INT64 nel codice PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Metodo 2: utilizza una libreria integrata

Nel codice PySpark, puoi semplicemente importare una libreria integrata e utilizzarla per compilare tutti i tipi di parametri. Per passare i parametri agli executor, compilali in un driver Spark come variabili Python e passa i valori agli executor. La libreria integrata supporta la maggior parte dei tipi di dati BigQuery, ad eccezione di INTERVAL, GEOGRAPHY, NUMERIC e BIGNUMERIC.

Tipo di dati BigQuery Tipo di dati Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Non supportato
BIGNUMERIC Non supportato
INTERVAL Non supportato
GEOGRAPHY Non supportato

L'esempio seguente mostra come importare la libreria integrata e utilizzarla per compilare un parametro di input di tipo INT64 e un parametro di input di tipo ARRAY<STRUCT<a INT64, b STRING>> nel codice PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

Nel codice Java o Scala, puoi ottenere i parametri di input della procedura memorizzata per Spark tramite le variabili di ambiente nel driver e negli executor di Spark. Il nome della variabile di ambiente ha il formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, dove PARAMETER_NAME è il nome del parametro di input. Ad esempio, se il nome del parametro di input è var, il nome della variabile di ambiente corrispondente è BIGQUERY_PROC_PARAM.var. Nel codice Java o Scala, puoi ottenere il valore parametro di input dalla variabile di ambiente.

Il seguente esempio mostra come ottenere il valore di un parametro di input dalle variabili di ambiente nel codice Scala:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

Il seguente esempio mostra come recuperare i parametri di input dalle variabili di ambiente nel codice Java:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Passare i valori come parametri OUT e INOUT

I parametri di output restituiscono il valore della procedura Spark, mentre il parametro INOUT accetta un valore per la procedura e restituisce un valore dalla procedura. Per utilizzare i parametri OUT e INOUT, aggiungi la parola chiave OUT o INOUT prima del nome del parametro quando crei la procedura Spark. Nel codice PySpark, utilizzi la libreria integrata per restituire un valore come parametro OUT o INOUT. Come per i parametri di input, la libreria integrata supporta la maggior parte dei tipi di dati BigQuery, ad eccezione di INTERVAL, GEOGRAPHY, NUMERIC e BIGNUMERIC. I valori di tipo TIME e DATETIME vengono convertiti nel fuso orario UTC quando vengono restituiti come parametri OUT o INOUT.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Leggi da una tabella Hive Metastore e scrivi i risultati in BigQuery

L'esempio seguente mostra come trasformare una tabella Hive Metastore e scrivere i risultati in BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Visualizza i filtri di log

Dopo aver chiamata una stored procedure per Spark, puoi visualizzare le informazioni del log. Per ottenere le informazioni sui filtri di Cloud Logging e sull'endpoint del cluster di cronologia Spark, utilizza il comando bq show. Le informazioni sul filtro sono disponibili nel campo SparkStatistics del job secondario. Per ottenere i filtri dei log:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor di query, elenca i job secondari del job script della procedura memorizzata:

    bq ls -j --parent_job_id=$parent_job_id

    Per scoprire come ottenere l'ID job, consulta Visualizzare i dettagli del job.

    L'output è simile al seguente:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. Identifica il jobId per la stored procedure e utilizza il comando bq show per visualizzare i dettagli del job:

    bq show --format=prettyjson --job $child_job_id

    Copia il campo sparkStatistics perché ti servirà in un altro passaggio.

    L'output è simile al seguente:

    {
    "configuration": {...}"statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. Per il logging, genera filtri dei log con i campi SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    I log vengono scritti nella risorsa monitorata bigquery.googleapis.com/SparkJob. I log sono etichettati dai componenti INFO, DRIVER e EXECUTOR. Per filtrare i log del driver Spark, aggiungi il componente labels.component = "DRIVER" ai filtri dei log. Per filtrare i log dall'executor Spark, aggiungi il componente labels.component = "EXECUTOR" ai filtri dei log.

Utilizza la chiave di crittografia gestita dal cliente

La procedura BigQuery Spark utilizza la chiave di crittografia gestita dal cliente (CMEK) per proteggere i tuoi contenuti, insieme alla crittografia predefinita fornita da BigQuery. Per utilizzare la chiave CMEK nella procedura Spark, attiva prima la creazione dell'account di servizio di crittografia BigQuery e concedi le autorizzazioni richieste. La procedura Spark supporta anche i criteri dell'organizzazione CMEK, se applicati al progetto.

Se la procedura memorizzata utilizza la modalità di sicurezza INVOKER, la chiave CMEK deve essere specificata tramite la variabile di sistema SQL quando viene chiamata la procedura. In caso contrario, il CMEK può essere specificato tramite la connessione associata alla procedura memorizzata.

Per specificare la CMEK tramite la connessione quando crei una stored procedure Spark, utilizza il seguente codice campione:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Per specificare CMEK tramite la variabile di sistema SQL quando chiami la procedura, utilizza il seguente codice campione:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Utilizzare i Controlli di servizio VPC

I Controlli di servizio VPC ti consentono di configurare un perimetro sicuro per proteggerti dall'esfiltrazione di dati. Per utilizzare i Controlli di servizio VPC con una procedura Spark per una maggiore sicurezza, devi prima creare un perimetro di servizio.

Per proteggere completamente i job di procedura Spark, aggiungi le seguenti API al perimetro di servizio:

  • API BigQuery (bigquery.googleapis.com)
  • API Cloud Logging (logging.googleapis.com)
  • API Cloud Storage (storage.googleapis.com), se utilizzi Cloud Storage
  • API Artifact Registry (artifactregistry.googleapis.com) o API Container Registry (containerregistry.googleapis.com), se utilizzi un contenitore personalizzato
  • API Dataproc Metastore (metastore.googleapis.com) e API Cloud Run Admin (run.googleapis.com), se utilizzi Dataproc Metastore

Aggiungi il progetto di query della procedura Spark al perimetro. Aggiungi altri progetti che ospitano il codice o i dati Spark nel perimetro.

Best practice

  • Quando utilizzi una connessione nel progetto per la prima volta, il provisioning richiede circa un minuto in più. Per risparmiare tempo, puoi riutilizzare una connessione Spark esistente quando crei una stored procedure per Spark.

  • Quando crei una procedura Spark per l'utilizzo in produzione, Google consiglia di specificare una versione di runtime. Per un elenco delle versioni del runtime supportate, consulta Versioni del runtime Dataproc Serverless. Ti consigliamo di utilizzare la versione LTS (Long-Time-Support).

  • Quando specifichi un contenitore personalizzato in una procedura Spark, ti consigliamo di utilizzare Artifact Registry e lo streaming di immagini.

  • Per un rendimento migliore, puoi specificare proprietà di allocazione delle risorse nella procedura Spark. Le procedure memorizzate Spark supportano un elenco di proprietà di allocazione delle risorse come Dataproc Serverless.

Limitazioni

  • Puoi utilizzare il protocollo dell'endpoint gRPC solo per connetterti a Dataproc Metastore. Altri tipi di Hive Metastore non sono ancora supportati.
  • Le chiavi di crittografia gestite dal cliente (CMEK) sono disponibili solo quando i clienti creano procedure Spark a una sola regione. Le chiavi CMEK per regioni globali e le chiavi CMEK multiregionali, ad esempio EU o US, non sono supportate.
  • Il passaggio dei parametri di output è supportato solo per PySpark.
  • Se il set di dati associato alla procedura memorizzata per Spark viene replicato in una regione di destinazione tramite la replica dei set di dati tra regioni, la procedura memorizzata può essere sottoposta a query solo nella regione in cui è stata creata.
  • Spark non supporta l'accesso agli endpoint HTTP nella rete privata di Controlli di servizio VPC.

Quote e limiti

Per informazioni su quote e limiti, consulta le procedure memorizzate per quote e limiti di Spark.

Passaggi successivi