Lavora con le stored procedure per Apache Spark

Questo documento è rivolto a data engineer, data scientist e analisti di dati creare e chiamare stored procedure per Spark in BigQuery.

Con BigQuery, puoi creare ed eseguire Spark le stored procedure scritte in Python, Java e Scala. Puoi quindi eseguire le stored procedure in BigQuery utilizzando un prompt in modo simile all'esecuzione delle procedure archiviate SQL.

Prima di iniziare

Per creare una stored procedure per Spark, chiedi al tuo per creare una connessione Spark e la condividiamo con te. L'amministratore deve anche concedere l'account di servizio associate alla connessione le autorizzazioni IAM (Identity and Access Management) richieste.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per eseguire le attività in questo documento, chiedi all'amministratore di concederti seguenti ruoli IAM:

Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso.

Questi ruoli predefiniti le autorizzazioni necessarie per eseguire le attività in questo documento. Per vedere le autorizzazioni esatte obbligatorie, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per eseguire le attività descritte in questo documento sono necessarie le seguenti autorizzazioni:

  • Crea una connessione:
    • bigquery.connections.create
    • bigquery.connections.list
  • Crea una stored procedure per Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Chiama una stored procedure per Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

Potresti anche riuscire a ottenere queste autorizzazioni con ruoli personalizzati e altri ruoli predefiniti.

Considerazione della località

Devi creare una stored procedure per Spark nella stessa posizione della tua connessione poiché la stored procedure viene eseguita nella stessa località della connessione. Per Ad esempio, per creare una stored procedure in più regioni degli Stati Uniti, utilizza un situata nella località multiregionale degli Stati Uniti.

Prezzi

Crea una stored procedure per Spark

Devi creare la stored procedure nella stessa località della connessione che utilizzi.

Se il corpo della stored procedure è superiore a 1 MB, consigliamo di inserire la stored procedure in un file in un nel bucket Cloud Storage anziché utilizzare il codice in linea. BigQuery offre due metodi per creare una stored procedure per Spark utilizzando Python:

Utilizza l'editor di query SQL

Creare una stored procedure per Spark nella query SQL editor, procedi nel seguente modo:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor query, aggiungi il codice campione per la CREATE PROCEDURE dichiarazione che viene visualizzata.

    In alternativa, nel riquadro Explorer, fai clic sulla connessione nel progetto utilizzato per creare la risorsa di connessione. Quindi, per creare un'istanza per Spark, fai clic su Crea stored procedure.

    Python

    Per creare una stored procedure per Spark in Python, utilizza il seguente codice campione:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java o Scala

    Per creare una stored procedure per Spark in Java Scala con l'opzione main_file_uri, usa il seguente codice campione:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Per creare una stored procedure per Spark in Java Scala con le opzioni main_class e jar_uris, usa il seguente codice campione:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Sostituisci quanto segue:

    • PROJECT_ID: il valore nel progetto in cui vuoi creare come myproject.
    • DATASET: il set di dati in cui vuoi creare la stored procedure, ad esempio mydataset.
    • PROCEDURE_NAME: il nome del stored procedure che vuoi eseguire in BigQuery, ad esempio mysparkprocedure.
    • PROCEDURE_ARGUMENT: un parametro per inserisci gli argomenti di input.

      In questo parametro, specifica i seguenti campi:

      • ARGUMENT_MODE: la modalità dell'argomento.

        I valori validi sono IN, OUT e INOUT. Per impostazione predefinita, il valore è IN.

      • ARGUMENT_NAME: il nome dell'argomento.
      • ARGUMENT_TYPE: il tipo di argomento.

      Ad esempio: myproject.mydataset.mysparkproc(num INT64).

      Per ulteriori informazioni informazioni, consulta la sezione per passare un valore come parametro IN o i parametri OUT e INOUT in questo documento.

    • CONNECTION_PROJECT_ID: il progetto che contiene la connection per eseguire Procedura Spark.
    • CONNECTION_REGION: la regione che contiene la connessione per eseguire Spark come us.
    • CONNECTION_ID: il valore ID connessione, ad esempio myconnection.

      Quando visualizzi i dettagli della connessione nella console Google Cloud, l'ID connessione è il valore riportato nell'ultima sezione ID connessione completo mostrato in ID connessione, ad esempio projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: la versione runtime di Spark, ad esempio 1.1.
    • MAIN_PYTHON_FILE_URI: il percorso di un il file PySpark, ad esempio gs://mybucket/mypysparkmain.py.

      In alternativa, se vuoi aggiungere il corpo della stored procedure in l'istruzione CREATE PROCEDURE, quindi aggiungi PYSPARK_CODE dopo LANGUAGE PYTHON AS, come mostrato nell'esempio Utilizza il codice in linea in questo documento.

    • PYSPARK_CODE: il valore definizione di un'applicazione PySpark nell'istruzione CREATE PROCEDURE se vuoi passare il corpo della procedura in linea.

      Il valore è un valore letterale stringa. Se il codice include la citazione e barre rovesciate, devono essere rappresentati da caratteri di escape o rappresentati come stringa. Ad esempio, è possibile rappresentare il codice restituito "\n"; in uno dei seguenti modi:

      • Stringa tra virgolette: "return \"\\n\";". Entrambe le virgolette e le barre rovesciate sono precedute dal carattere di escape.
      • Stringa con tre virgolette: """return "\\n";""". Barre rovesciate sono caratteri di escape, mentre le virgolette non lo sono.
      • Stringa non elaborata: r"""return "\n";""". Non sono necessari caratteri di escape.
      Per informazioni su come aggiungere il codice PySpark in linea, consulta Utilizzare il codice in linea.
    • MAIN_JAR_URI: il percorso del file JAR che contiene la classe main, ad esempio gs://mybucket/my_main.jar.
    • CLASS_NAME: il nome completo di una classe in un JAR impostato con l'opzione jar_uris, ad esempio com.example.wordcount.
    • URI: il percorso del file JAR contenente la classe specificata nella classe main, ad esempio gs://mybucket/mypysparkmain.jar.

    Per ulteriori opzioni che puoi specificare in OPTIONS, consulta le elenco di opzioni della procedura.

Utilizzare l'editor PySpark

Quando crei una procedura utilizzando l'editor PySpark, non è necessario utilizzare CREATE PROCEDURE. Aggiungi invece il tuo codice Python direttamente editor Pyspark e salvare o eseguire il codice.

Creare una stored procedure per Spark in PySpark editor, procedi nel seguente modo:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Se vuoi digitare direttamente il codice PySpark, apri l'editor PySpark. Per aprire il Editor PySpark, fai clic sull'icona menu accanto a Crea query SQL, e poi seleziona Crea procedura PySpark.

  3. Per impostare le opzioni, fai clic su Altro > Opzioni PySpark e poi le seguenti:

    1. Specifica la posizione in cui vuoi eseguire il codice PySpark.

    2. Nel campo Connessione, specifica la connessione Spark.

    3. Nella sezione Chiamata di stored procedure, specifica il set di dati in in cui archiviare le stored procedure temporanee generati. Puoi impostare un set di dati specifico o consentire l'uso di un set di dati temporaneo per richiamare il codice PySpark.

      Il set di dati temporaneo viene generato con la località specificata nel passaggio precedente. Se viene specificato il nome di un set di dati, assicurati che e Spark devono trovarsi nella stessa località.

    4. Nella sezione Parametri, definisci i parametri per il stored procedure. La del parametro viene utilizzato solo durante le esecuzioni in-session del del codice PySpark, ma la dichiarazione stessa è archiviata nella procedura.

    5. Nella sezione Opzioni avanzate, specifica le opzioni della procedura. Per un elenco dettagliato delle opzioni della procedura, vedi elenco opzioni procedura.

    6. Nella sezione Proprietà, aggiungi le coppie chiave-valore alla configurare il job. Puoi utilizzare qualsiasi coppia chiave-valore del Proprietà Spark serverless di Dataproc.

    7. In Impostazioni account di servizio, specifica l'account di servizio personalizzato. CMEK, set di dati gestione temporanea e cartella di Cloud Storage temporanea durante le esecuzioni all'interno della sessione del codice PySpark.

    8. Fai clic su Salva.

Salva una stored procedure per Spark

Dopo aver creato la stored procedure utilizzando l'editor PySpark, puoi salvare la stored procedure. Per farlo, segui questi passaggi:

  1. Nella console Google Cloud, vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor query, crea una stored procedure per Spark utilizzando Python con l'editor PySpark.

  3. Fai clic su Salva > Salva procedura.

  4. Nella finestra di dialogo Save stored procedure, specifica il nome del set di dati in cui vuoi archiviare la stored procedure e il nome della stored procedure.

  5. Fai clic su Salva.

    Se vuoi eseguire il codice PySpark invece di salvarlo come archivio puoi fare clic su Esegui anziché su Salva.

Utilizzo di container personalizzati

Il container personalizzato fornisce l'ambiente di runtime per il driver del carico di lavoro e i processi esecutori. Per utilizzare i container personalizzati, usa il codice campione seguente:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Sostituisci quanto segue:

  • PROJECT_ID: il progetto in cui vuoi creare la stored procedure, ad esempio myproject.
  • DATASET: il set di dati in cui vuoi creare la stored procedure, ad esempio mydataset.
  • PROCEDURE_NAME: il nome della stored procedure che vuoi eseguire in BigQuery, ad esempio mysparkprocedure.
  • PROCEDURE_ARGUMENT: un parametro per inserire gli argomenti di input.

    In questo parametro, specifica i seguenti campi:

    • ARGUMENT_MODE: la modalità dell'argomento.

      I valori validi sono IN, OUT e INOUT. Per impostazione predefinita, il valore è IN.

    • ARGUMENT_NAME: il nome dell'argomento.
    • ARGUMENT_TYPE: il tipo di argomento.

    Ad esempio: myproject.mydataset.mysparkproc(num INT64).

    Per saperne di più, consulta la sezione Passare un valore come parametro IN o i parametri OUT e INOUT in questo documento.

  • CONNECTION_PROJECT_ID: il progetto che contiene la connessione per eseguire la procedura Spark.
  • CONNECTION_REGION: la regione contenente la connessione per eseguire la procedura Spark, ad esempio us.
  • CONNECTION_ID: l'ID connessione, ad esempio myconnection.

    Quando visualizzi i dettagli della connessione nella console Google Cloud, l'ID connessione è il valore riportato nell'ultima sezione ID connessione completo mostrato in ID connessione, ad esempio projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: la versione runtime di Spark, ad esempio 1.1.
  • MAIN_PYTHON_FILE_URI: il percorso di un il file PySpark, ad esempio gs://mybucket/mypysparkmain.py.

    In alternativa, se vuoi aggiungere il corpo della stored procedure in l'istruzione CREATE PROCEDURE, quindi aggiungi PYSPARK_CODE dopo LANGUAGE PYTHON AS, come mostrato nell'esempio Utilizza il codice in linea in questo documento.

  • PYSPARK_CODE: il valore definizione di un'applicazione PySpark nell'istruzione CREATE PROCEDURE se vuoi passare il corpo della procedura in linea.

    Il valore è un valore letterale stringa. Se il codice include la citazione e barre rovesciate, devono essere rappresentati da caratteri di escape o rappresentati come stringa. Ad esempio, è possibile rappresentare il codice restituito "\n"; in uno dei seguenti modi:

    • Stringa tra virgolette: "return \"\\n\";". Entrambe le virgolette e le barre rovesciate sono precedute dal carattere di escape.
    • Stringa con tre virgolette: """return "\\n";""". Barre rovesciate sono caratteri di escape, mentre le virgolette non lo sono.
    • Stringa non elaborata: r"""return "\n";""". Non sono necessari caratteri di escape.
    Per informazioni su come aggiungere il codice PySpark in linea, consulta Utilizzare il codice in linea.
  • CONTAINER_IMAGE: percorso dell'immagine nel registro di artefatti. Deve contenere solo librerie da utilizzare nella procedura. Se non specificata, viene utilizzata l'immagine del container predefinito di sistema associata alla versione del runtime.

Per saperne di più su come creare un'immagine container personalizzata Spark, consulta Creare un'immagine container personalizzata.

Chiama una stored procedure per Spark

Dopo aver creato una stored procedure, può chiamarlo utilizzando una delle seguenti opzioni:

Console

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nel riquadro Explorer, espandi il progetto e seleziona per Spark che vuoi eseguire.

  3. Nella finestra Informazioni sulla stored procedure, fai clic su Richiama stored procedure. In alternativa, puoi espandere l'opzione Visualizza azioni e fare clic su Richiama.

  4. Fai clic su Esegui.

  5. Nella sezione Tutti i risultati, fai clic su Visualizza risultati.

  6. (Facoltativo) Nella sezione Risultati delle query, segui questi passaggi:

    • Per visualizzare i log del driver Spark, fai clic su Dettagli esecuzione.

    • Se vuoi visualizzare i log in Cloud Logging, fai clic su Informazioni job, quindi nel campo Log fai clic su log.

    • Se vuoi ottenere l'endpoint di Spark History Server, fai clic su Job informazioni, quindi fai clic su Server cronologia Spark.

SQL

Per chiamare una stored procedure, utilizza CALL PROCEDURE istruzione:

  1. Nella console Google Cloud, vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor query, inserisci la seguente istruzione:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
    

  3. Fai clic su Esegui.

Per ulteriori informazioni su come eseguire le query, vedi Eseguire una query interattiva.

Utilizzo di un account di servizio personalizzato

Anziché utilizzare l'identità del servizio di connessione Spark per accesso ai dati, puoi utilizzare un account di servizio personalizzato per accedere ai dati Codice Spark.

Per utilizzare un account di servizio personalizzato, specifica la modalità di sicurezza INVOKER (utilizzando il metodo EXTERNAL SECURITY INVOKER) quando crei un'istruzione stored procedure Spark e specifica l'account di servizio quando richiami la stored procedure.

Per accedere al codice Spark e utilizzarlo da Cloud Storage, devi concedere le autorizzazioni necessarie Identificazione del servizio di connessione Spark. Devi concedere ai l'account di servizio della connessione e l'account IAM di storage.objects.get o il ruolo IAM storage.objectViewer.

Facoltativamente, puoi concedere all'account di servizio della connessione l'accesso a Dataproc Metastore e server di cronologia permanente di Dataproc se li hai specificati nella connessione. Per ulteriori informazioni, vedi Concedi l'accesso all'account di servizio.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Facoltativamente, puoi aggiungere i seguenti argomenti al codice precedente:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Sostituisci quanto segue:

  • CUSTOM_SERVICE_ACCOUNT: campo obbligatorio. Un account di servizio personalizzato fornito da te.
  • BUCKET_NAME: facoltativo. Il bucket Cloud Storage utilizzato come file system dell'applicazione Spark predefinita. Se non viene specificato, nel progetto viene creato un bucket Cloud Storage predefinito e il bucket viene condiviso da tutti i job in esecuzione nello stesso progetto.
  • DATASET: facoltativo. Il set di dati in cui archiviare i dati temporanei prodotti richiamando la procedura. La pulizia dei dati viene eseguita al termine del job. Se non viene specificato, viene creato un set di dati temporaneo predefinito per il job.

Il tuo account di servizio personalizzato deve disporre delle seguenti autorizzazioni:

  • Per leggere e scrivere nel bucket gestione temporanea utilizzato come file system dell'applicazione Spark predefinito:

    • storage.objects.* autorizzazioni o il ruolo IAM roles/storage.objectAdmin per il bucket gestione temporanea che specificare.
    • Inoltre, le autorizzazioni storage.buckets.* o il ruolo IAM roles/storage.Admin nel progetto se il bucket gestione temporanea non è specificato.
  • (Facoltativo) Per leggere e scrivere dati da e in BigQuery:

    • bigquery.tables.* nelle tue tabelle BigQuery.
    • bigquery.readsessions.* sul tuo progetto.
    • Il ruolo IAM roles/bigquery.admin include le autorizzazioni precedenti.
  • (Facoltativo) Per leggere e scrivere dati da e in Cloud Storage:

    • storage.objects.* o il ruolo IAM roles/storage.objectAdmin per gli oggetti Cloud Storage.
  • (Facoltativo) Per leggere e scrivere nel set di dati gestione temporanea utilizzato per i parametri INOUT/OUT:

    • Ruolo IAM bigquery.tables.* o roles/bigquery.dataEditor nel set di dati gestione temporanea specificato.
    • Inoltre, l'autorizzazione bigquery.datasets.create o il ruolo IAM roles/bigquery.dataEditor nella progetto se il set di dati gestione temporanea non è specificato.

Esempi di stored procedure per Spark

Questa sezione mostra esempi di come creare una stored procedure per Apache Spark.

Usa un file PySpark o JAR in Cloud Storage

L'esempio seguente mostra come creare una stored procedure per Spark utilizzando la connessione my-project-id.us.my-connection e un server PySpark o JAR archiviato in un bucket Cloud Storage:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java o Scala

Utilizza main_file_uri per creare una stored procedure:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Utilizza main_class per creare una stored procedure:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Usa codice in linea

L'esempio seguente mostra come creare una stored procedure per Spark utilizzando la connessione my-project-id.us.my-connection codice PySpark incorporato:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Trasmettere un valore come parametro di input

I seguenti esempi mostrano i due metodi per passare un valore come input in Python:

Metodo 1: utilizza le variabili di ambiente

Nel codice PySpark puoi ottenere i parametri di input della stored procedure per Spark tramite le variabili di ambiente nel driver Spark esecutori. Il nome della variabile di ambiente è nel formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, dove PARAMETER_NAME è il nome del parametro di input. Per Ad esempio, se il nome del parametro di input è var, il nome del la variabile di ambiente corrispondente è BIGQUERY_PROC_PARAM.var. L'input sono codificati in formato JSON. Nel codice PySpark, puoi ottenere il valore parametro di input in formato JSON dalla variabile di ambiente e la decodifica in una variabile Python.

L'esempio seguente mostra come ottenere il valore di un parametro di input di tipo INT64 nel tuo codice PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Metodo 2: usa una libreria integrata

Nel codice PySpark, puoi semplicemente importare una libreria integrata e utilizzarla per per compilare tutti i tipi di parametri. Per passare i parametri agli esecutori, compila i parametri in un driver Spark come variabili Python e passare i valori esecutori. La libreria integrata supporta la maggior parte delle tipi di dati tranne INTERVAL, GEOGRAPHY, NUMERIC e BIGNUMERIC.

Tipo di dati BigQuery Tipo di dati Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Non supportato
BIGNUMERIC Non supportato
INTERVAL Non supportato
GEOGRAPHY Non supportato

L'esempio seguente mostra come importare la libreria integrata e utilizzarla per compilare un parametro di input di tipo INT64 e un parametro di input di tipo ARRAY<STRUCT<a INT64, b STRING>> nel codice PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

Nel codice Java o Scala, puoi ottenere i parametri di input del per Spark tramite le variabili di ambiente nel driver Spark esecutori. Il nome della variabile di ambiente è nel formato BIGQUERY_PROC_PARAM.PARAMETER_NAME, dove PARAMETER_NAME è il nome del parametro di input. Ad esempio, se il nome del parametro di input è var, la macro il nome della variabile di ambiente corrispondente è BIGQUERY_PROC_PARAM.var. Nel tuo codice Java o Scala, puoi ottenere il valore parametro di input dalla variabile di ambiente.

L'esempio seguente mostra come ottenere il valore di un parametro di input da variabili di ambiente nel codice Scala:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

L'esempio seguente mostra come ottenere parametri di input dalle variabili di ambiente nel tuo codice Java:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Trasmetti i valori come parametri OUT e INOUT

I parametri di output restituiscono il valore dalla procedura Spark, mentre INOUT accetta un valore per la procedura e restituisce un valore dalla procedura. Per utilizzare i parametri OUT e INOUT, aggiungi la parola chiave OUT o INOUT prima del nome del parametro durante la creazione della procedura Spark. In PySpark codice, utilizzi la libreria integrata per restituire un valore come OUT o INOUT . Come i parametri di input, la libreria integrata supporta la maggior parte dei Tipi di dati BigQuery, tranne INTERVAL, GEOGRAPHY, NUMERIC, e BIGNUMERIC. I valori di tipo TIME e DATETIME vengono convertiti nel fuso orario UTC il fuso orario quando viene restituito come parametri OUT o INOUT.


CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Leggere da una tabella Hive Metastore e scrivere i risultati in BigQuery

L'esempio seguente mostra come trasformare una tabella Hive Metastore e scrivi i risultati in BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Visualizza i filtri dei log

Dopo aver chiamato una stored procedure per Spark, possono visualizzare le informazioni di log. per ottenere le informazioni del filtro di Cloud Logging. e l'endpoint del cluster di cronologia Spark, utilizza il comando bq show. Le informazioni del filtro sono disponibili nel campo SparkStatistics della lavoro secondario. Per ottenere i filtri di log, segui questi passaggi:

  1. Vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor query, elenca i job figlio della stored procedure job di script:

    bq ls -j --parent_job_id=$parent_job_id
    

    Per scoprire come ottenere l'ID job, consulta Visualizzare i dettagli del job.

    L'output è simile al seguente:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
    
  3. Identifica jobId per la stored procedure e utilizza il comando bq show per visualizzare i dettagli del job:

    bq show --format=prettyjson --job $child_job_id
    

    Copia il campo sparkStatistics perché ti serve in un altro passaggio.

    L'output è simile al seguente:

    {
    "configuration": {...}
    …
    "statistics": {
     …
      "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
        …
      }
    }
    }
    

  4. Per Logging, genera filtri di log con i campi SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation
    
    

    I log sono scritti nel bigquery.googleapis.com/SparkJob monitorato risorsa. I log sono etichettati dai componenti INFO, DRIVER e EXECUTOR. Per filtrare i log da il driver Spark, aggiungi il componente labels.component = "DRIVER" a i filtri dei log. Per filtrare i log dall'esecutore Spark, aggiungi il componente labels.component = "EXECUTOR" ai filtri di log.

Utilizza la chiave di crittografia gestita dal cliente

La procedura BigQuery Spark utilizza la chiave di crittografia gestita dal cliente (CMEK) per proteggere i tuoi contenuti, insieme alla crittografia predefinita fornita da BigQuery. Per utilizzare CMEK nella procedura Spark, attiva prima la creazione dell'account di servizio di crittografia BigQuery e concedi le autorizzazioni necessarie. La procedura Spark supporta anche i criteri dell'organizzazione CMEK se vengono applicati al progetto.

Se la tua stored procedure utilizza la modalità di sicurezza INVOKER, la tua CMEK dovrebbe essere specificata tramite la variabile di sistema SQL durante la chiamata della procedura. In caso contrario, la tua CMEK può essere specificata tramite la connessione associata alla stored procedure.

Per specificare la CMEK tramite la connessione quando crei una stored procedure Spark, utilizza il codice campione seguente:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Per specificare CMEK tramite la variabile di sistema SQL durante la chiamata della procedura, utilizza il seguente codice campione:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Utilizza Controlli di servizio VPC

Controlli di servizio VPC consente di configurare un perimetro sicuro per prevenire l'esfiltrazione di dati. Per utilizzare Controlli di servizio VPC con una procedura Spark per ulteriore sicurezza, devi prima creare un perimetro di servizio.

Per proteggere completamente i job di procedura Spark, aggiungi le API seguenti al perimetro di servizio:

  • API BigQuery (bigquery.googleapis.com)
  • API Cloud Logging (logging.googleapis.com)
  • API Cloud Storage (storage.googleapis.com), se utilizzi Cloud Storage
  • API Artifact Registry (artifactregistry.googleapis.com) o Container Registry (containerregistry.googleapis.com), se utilizzi un container personalizzato
  • API Dataproc Metastore (metastore.googleapis.com) e API Cloud Run Admin (run.googleapis.com), se utilizzi Dataproc Metastore

Aggiungi il progetto di query della procedura Spark nel perimetro. Aggiungi altri progetti che ospitano nel perimetro il tuo codice o i tuoi dati Spark.

Best practice

  • Quando utilizzi una connessione nel progetto per la prima volta, sono necessari circa un minuto in più per eseguire il provisioning. Per risparmiare tempo, puoi riutilizzare un modello Connessione Spark quando crei una stored procedure Spark.

  • Quando crei una procedura Spark per l'uso in produzione, Google consiglia di specificare una versione del runtime. Per un elenco delle le versioni di runtime, consulta Versioni di runtime di Dataproc Serverless. Ti consigliamo di utilizzare la versione LTS (Long-Time-Support).

  • Quando specifichi un container personalizzato in una Spark consigliamo di utilizzare Artifact Registry e il flusso di immagini.

  • Per ottenere prestazioni migliori, puoi specificare proprietà di allocazione delle risorse nella procedura Spark. Spark le stored procedure supportano un elenco di proprietà di allocazione delle risorse Dataproc Serverless.

Limitazioni

  • Puoi utilizzare solo il protocollo endpoint gRPC per la connessione a Dataproc Metastore. Non sono ancora supportati altri tipi di Hive Metastore.
  • Le chiavi di crittografia gestite dal cliente (CMEK) sono disponibili solo quando i clienti e creare procedure Spark a regione singola. Regione globale Le chiavi CMEK e le chiavi CMEK a più regioni, ad esempio EU o US, non sono supportati.
  • Il passaggio dei parametri di output è supportato solo per PySpark.
  • Se il set di dati associato alla stored procedure Spark viene replicato in una regione di destinazione tramite replica del set di dati tra regioni, lo spazio può essere interrogato solo nella regione in cui è stata creata.

Quote e limiti

Per informazioni su quote e limiti, consulta le stored procedure per Spark quote e limiti.

Passaggi successivi