Mit gespeicherten Prozeduren für Apache Spark arbeiten

Dieses Dokument richtet sich an Data Engineers, Data Scientists und Datenanalysten, die gespeicherte Prozeduren für Spark in BigQuery erstellen und aufrufen möchten.

Mit BigQuery können Sie gespeicherte Spark-Prozeduren erstellen und ausführen, die in Python, Java oder Scala geschrieben sind. Sie können diese gespeicherten Prozeduren dann in BigQuery mithilfe einer GoogleSQL-Abfrage ausführen, ähnlich wie beim Ausführen von gespeicherten SQL-Prozeduren.

Hinweise

Bitten Sie Ihren Administrator, eine Spark-Verbindung zu erstellen und für Sie freizugeben, damit Sie eine gespeicherte Prozedur für Spark erstellen können. Ihr Administrator muss dem Dienstkonto, das der Verbindung zugeordnet ist, auch die erforderlichen IAM-Berechtigungen (Identity and Access Management) erteilen.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erteilen, die Sie zum Ausführen der Aufgaben in diesem Dokument benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Diese vordefinierten Rollen enthalten die Berechtigungen, die zum Ausführen der Aufgaben in diesem Dokument erforderlich sind. Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Ausführen der Aufgaben in diesem Dokument erforderlich:

  • Stellen Sie eine Verbindung her:
    • bigquery.connections.create
    • bigquery.connections.list
  • Gespeicherte Prozedur für Spark erstellen:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Gespeicherte Prozedur für Apache Spark aufrufen:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Standortberücksichtigung

Sie müssen eine gespeicherte Prozedur für Spark am selben Standort wie Ihre Verbindung erstellen, da die gespeicherte Prozedur am selben Standort wie die Verbindung ausgeführt wird. Wenn Sie beispielsweise ein gespeichertes Verfahren am multiregionalen Standort "US" erstellen möchten, verwenden Sie eine Verbindung am multiregionalen Standort "US".

Preise

  • Die Gebühren für das Ausführen von Spark-Prozeduren in BigQuery ähneln denen für das Ausführen von Spark-Prozeduren in Dataproc Serverless. Weitere Informationen finden Sie unter Dataproc Serverless-Preise.

  • Gespeicherte Spark-Prozeduren können mit dem On-Demand-Preismodell sowie mit jeder der BigQuery-Versionen verwendet werden. Spark-Verfahren werden in allen Fällen nach dem „Pay as you go“-Modell der BigQuery Enterprise Edition abgerechnet, unabhängig vom Computing-Preismodell. die in Ihrem Projekt verwendet werden.

  • Gespeicherte Spark-Prozeduren für BigQuery unterstützen keine Verwendung von Reservierungen oder Zusicherungen. Vorhandene Reservierungen und Zusicherungen werden weiterhin für andere unterstützte Abfragen und Verfahren verwendet. Die Gebühren für die Nutzung gespeicherter Spark-Verfahren werden Ihrer Rechnung in der Enterprise-Version hinzugefügt (Pay-as-you-go-Kosten). Die Rabatte für Ihre Organisation werden gegebenenfalls angewendet.

  • Während gespeicherte Prozeduren von Spark eine Spark-Ausführungs-Engine verwenden, fallen für die Spark-Ausführung keine separaten Gebühren an. Wie bereits erwähnt, werden entsprechende Gebühren als „Pay as you go“-SKU der BigQuery Enterprise-Version gemeldet.

  • Gespeicherte Spark-Prozeduren bieten keine kostenlose Stufe.

Gespeicherte Prozedur für Spark erstellen

Sie müssen die gespeicherte Prozedur am selben Speicherort wie die von Ihnen verwendete Verbindung erstellen.

Wenn der Text Ihrer gespeicherten Prozedur mehr als 1 MB beträgt, empfehlen wir, die gespeicherte Prozedur in einer Datei in einem Cloud Storage-Bucket zu speichern, anstatt Inline-Code zu verwenden. BigQuery bietet zwei Methoden zum Erstellen einer gespeicherten Prozedur für Spark mit Python:

SQL-Abfrageeditor verwenden

So erstellen Sie eine gespeicherte Prozedur für Spark im SQL-Abfrageeditor:

  1. Rufen Sie die Seite BigQuery auf.

    BigQuery aufrufen

  2. Fügen Sie im Abfrageeditor den Beispielcode für die angezeigte CREATE PROCEDURE-Anweisung hinzu.

    Alternativ können Sie im Bereich Explorer auf die Verbindung in dem Projekt klicken, in dem Sie die Verbindungsressource erstellt haben. Zum Erstellen einer gespeicherten Prozedur für Spark klicken Sie dann auf Gespeicherte Prozedur erstellen.

    Python

    Verwenden Sie das folgende Codebeispiel, um gespeicherte Prozeduren für Spark in Python zu erstellen:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java oder Scala

    Verwenden Sie den folgenden Beispielcode, um eine gespeicherte Prozedur für Spark in Java oder Scala mit der Option main_file_uri zu erstellen:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Verwenden Sie den folgenden Beispielcode, um eine gespeicherte Prozedur für Spark in Java oder Scala mit den Optionen main_class und jar_uris zu erstellen:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: das Projekt, in dem Sie das gespeicherte Verfahren erstellen möchten, z. B. myproject.
    • DATASET: Das Dataset, in dem Sie die gespeicherte Prozedur erstellen möchten, z. B. mydataset.
    • PROCEDURE_NAME: der Name der gespeicherten Prozedur, die Sie in BigQuery ausführen möchten, z. B. mysparkprocedure.
    • PROCEDURE_ARGUMENT: ein Parameter zur Eingabe der Eingabeargumente.

      Geben Sie in diesem Parameter die folgenden Felder an:

      • ARGUMENT_MODE: der Modus des Arguments.

        Zulässige Werte sind z. B. IN, OUT und INOUT. Der Standardwert ist IN.

      • ARGUMENT_NAME: der Name des Arguments.
      • ARGUMENT_TYPE: der Typ des Arguments.

      Beispiel: myproject.mydataset.mysparkproc(num INT64).

      Weitere Informationen finden Sie in den Abschnitten zum Übergeben eines Werts als IN-Parameter oder OUT- und INOUT-Parameter in diesem Dokument.

    • CONNECTION_PROJECT_ID: das Projekt, das die Verbindung zum Ausführen des Spark-Verfahrens enthält.
    • CONNECTION_REGION: die Region, die die Verbindung zum Ausführen des Spark-Verfahrens enthält, z. B. us.
    • CONNECTION_ID: die Verbindungs-ID, z. B. myconnection.

      Wenn Sie sich Verbindungsdetails in der Google Cloud Console ansehen, ist die Verbindungs-ID der Wert im letzten Abschnitt der voll qualifizierten Verbindungs-ID, der unter Verbindungs-ID angezeigt wird, z. B. projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: die Laufzeitversion von Spark, z. B. 1.1.
    • MAIN_PYTHON_FILE_URI: der Pfad zu einer zweiten Datei, z. B. gs://mybucket/mypysparkmain.py.

      Wenn Sie den Text der gespeicherten Prozedur in der Anweisung CREATE PROCEDURE hinzufügen möchten, können Sie alternativ PYSPARK_CODE nach LANGUAGE PYTHON AS hinzufügen, wie im Beispiel unter Inline-Code verwenden in diesem Dokument gezeigt.

    • PYSPARK_CODE: die Definition einer PySpark-Anwendung in der Anweisung CREATE PROCEDURE, wenn Sie den Text der Prozedur inline übergeben möchten

      Der Wert ist ein Stringliteral. Wenn der Code Anführungszeichen und umgekehrte Schrägstriche enthält, müssen diese entweder mit Escapezeichen versehen oder als Rohstring dargestellt werden. Beispielsweise kann der Code "\n"; als einer der folgenden Werte dargestellt werden:

      • String in Anführungszeichen "return \"\\n\";" Sowohl Anführungszeichen als auch umgekehrte Schrägstriche werden maskiert.
      • String in drei Anführungszeichen: """return "\\n";""". Umgekehrte Schrägstriche werden maskiert, Anführungszeichen nicht.
      • Rohstring: r"""return "\n";""". Es ist keine Maskierung erforderlich.
      Weitere Informationen zum Einfügen von Inline-PySpark-Code finden Sie unter Inline-Code verwenden.
    • MAIN_JAR_URI: der Pfad der JAR-Datei, die die Klasse main enthält, z. B. gs://mybucket/my_main.jar.
    • CLASS_NAME: der voll qualifizierte Name einer Klasse in einem JAR-Satz mit der Option jar_uris, z. B. com.example.wordcount
    • URI: der Pfad der JAR-Datei, die die in der Klasse main angegebene Klasse enthält, z. B. gs://mybucket/mypysparkmain.jar.

    Weitere Optionen, die Sie in OPTIONS angeben können, finden Sie in der Liste der Verfahrensoptionen.

PySpark-Editor verwenden

Wenn Sie eine Prozedur mit dem PySpark-Editor erstellen, müssen Sie die Anweisung CREATE PROCEDURE nicht verwenden. Fügen Sie Ihren Python-Code stattdessen direkt in den PySpark-Editor ein und speichern oder führen Sie ihn aus.

So erstellen Sie eine gespeicherte Prozedur für Spark im PySpark-Editor:

  1. Rufen Sie die Seite BigQuery auf.

    BigQuery aufrufen

  2. Wenn Sie den PySpark-Code direkt eingeben möchten, öffnen Sie den PySpark-Editor. Klicken Sie zum Öffnen des PySpark-Editors auf den Menü neben SQL-Abfrage erstellen und wählen Sie PySpark-Prozedur erstellen

  3. Klicken Sie zum Festlegen von Optionen auf Mehr > PySpark-Optionen und gehen Sie dann so vor:

    1. Geben Sie den Speicherort an, an dem Sie den PySpark-Code ausführen möchten.

    2. Geben Sie im Feld Verbindung die Spark-Verbindung an.

    3. Geben Sie im Abschnitt Aufruf der gespeicherten Prozedur das Dataset an, in dem die generierten temporären gespeicherten Prozeduren gespeichert werden sollen. Sie können entweder ein bestimmtes Dataset festlegen oder die Verwendung eines temporären Datasets zulassen, um den PySpark-Code aufzurufen.

      Das temporäre Dataset wird mit dem im vorherigen Schritt angegebenen Speicherort generiert. Wenn ein Dataset-Name angegeben ist, müssen sich das Dataset und die Spark-Verbindung am selben Speicherort befinden.

    4. Definieren Sie im Abschnitt Parameter die Parameter für die gespeicherte Prozedur. Der Wert des Parameters wird nur während der Ausführung des PySpark-Codes innerhalb von Sitzungen verwendet, aber die Deklaration selbst wird in der Prozedur gespeichert.

    5. Geben Sie im Abschnitt Erweiterte Optionen die Prozeduroptionen an. Eine detaillierte Liste der Prozeduroptionen finden Sie in der Liste der Prozeduroptionen.

    6. Fügen Sie im Bereich Attribute die Schlüssel/Wert-Paare hinzu, um den Job zu konfigurieren. Sie können jedes der Schlüssel/Wert-Paare aus den Dataproc Serverless Spark-Attributen verwenden.

    7. Geben Sie in den Dienstkontoeinstellungen das benutzerdefinierte Dienstkonto, den CMEK, das Staging-Dataset und den Staging-Cloud Storage-Ordner an, die während der Ausführung des PySpark-Codes innerhalb von Sitzungen verwendet werden sollen.

    8. Klicken Sie auf Speichern.

Gespeicherte Prozedur für Spark speichern

Nachdem Sie mit dem PySpark-Editor die gespeicherte Prozedur erstellt haben, können Sie die gespeicherte Prozedur speichern. Führen Sie dazu die folgenden Schritte aus:

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Erstellen Sie im Abfrageeditor eine gespeicherte Prozedur für Spark mithilfe von Python mit dem PySpark-Editor.

  3. Klicken Sie auf Speichern > Prozedur speichern.

  4. Geben Sie im Dialogfeld Gespeicherte Prozedur speichern den Namen des Datasets, in dem Sie die gespeicherte Prozedur speichern möchten, sowie den Namen der gespeicherten Prozedur an.

  5. Klicken Sie auf Speichern.

    Wenn Sie den PySpark-Code nur ausführen und nicht als gespeicherte Prozedur speichern möchten, klicken Sie auf Ausführen anstelle von Speichern.

Benutzerdefinierte Container verwenden

Der benutzerdefinierte Container stellt die Laufzeitumgebung für die Treiber- und Executor-Prozesse der Arbeitslast bereit. Verwenden Sie den folgenden Beispielcode, um benutzerdefinierte Container zu verwenden:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Ersetzen Sie Folgendes:

  • PROJECT_ID: das Projekt, in dem Sie das gespeicherte Verfahren erstellen möchten, z. B. myproject.
  • DATASET: Das Dataset, in dem Sie die gespeicherte Prozedur erstellen möchten, z. B. mydataset.
  • PROCEDURE_NAME: der Name der gespeicherten Prozedur, die Sie in BigQuery ausführen möchten, z. B. mysparkprocedure.
  • PROCEDURE_ARGUMENT: ein Parameter zur Eingabe der Eingabeargumente.

    Geben Sie in diesem Parameter die folgenden Felder an:

    • ARGUMENT_MODE: der Modus des Arguments.

      Zulässige Werte sind z. B. IN, OUT und INOUT. Der Standardwert ist IN.

    • ARGUMENT_NAME: der Name des Arguments.
    • ARGUMENT_TYPE: der Typ des Arguments.

    Beispiel: myproject.mydataset.mysparkproc(num INT64).

    Weitere Informationen finden Sie in den Abschnitten zum Übergeben eines Werts als IN-Parameter oder OUT- und INOUT-Parameter in diesem Dokument.

  • CONNECTION_PROJECT_ID: das Projekt, das die Verbindung zum Ausführen des Spark-Verfahrens enthält.
  • CONNECTION_REGION: die Region, die die Verbindung zum Ausführen des Spark-Verfahrens enthält, z. B. us.
  • CONNECTION_ID: die Verbindungs-ID, z. B. myconnection.

    Wenn Sie sich Verbindungsdetails in der Google Cloud Console ansehen, ist die Verbindungs-ID der Wert im letzten Abschnitt der voll qualifizierten Verbindungs-ID, der unter Verbindungs-ID angezeigt wird, z. B. projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: die Laufzeitversion von Spark, z. B. 1.1.
  • MAIN_PYTHON_FILE_URI: der Pfad zu einer zweiten Datei, z. B. gs://mybucket/mypysparkmain.py.

    Wenn Sie den Text der gespeicherten Prozedur in der Anweisung CREATE PROCEDURE hinzufügen möchten, können Sie alternativ PYSPARK_CODE nach LANGUAGE PYTHON AS hinzufügen, wie im Beispiel unter Inline-Code verwenden in diesem Dokument gezeigt.

  • PYSPARK_CODE: die Definition einer PySpark-Anwendung in der Anweisung CREATE PROCEDURE, wenn Sie den Text der Prozedur inline übergeben möchten

    Der Wert ist ein Stringliteral. Wenn der Code Anführungszeichen und umgekehrte Schrägstriche enthält, müssen diese entweder mit Escapezeichen versehen oder als Rohstring dargestellt werden. Beispielsweise kann der Code "\n"; als einer der folgenden Werte dargestellt werden:

    • String in Anführungszeichen "return \"\\n\";" Sowohl Anführungszeichen als auch umgekehrte Schrägstriche werden maskiert.
    • String in drei Anführungszeichen: """return "\\n";""". Umgekehrte Schrägstriche werden maskiert, Anführungszeichen nicht.
    • Rohstring: r"""return "\n";""". Es ist keine Maskierung erforderlich.
    Weitere Informationen zum Einfügen von Inline-PySpark-Code finden Sie unter Inline-Code verwenden.
  • CONTAINER_IMAGE: Pfad des Images in der Artefakt-Registry. Er darf nur Bibliotheken zur Verwendung in Ihrem Verfahren enthalten. Wenn keine Angabe erfolgt, wird das mit der Laufzeitversion verknüpfte Systemstandard-Container-Image verwendet.

Weitere Informationen zum Erstellen eines benutzerdefinierten Container-Images mit Spark finden Sie unter Benutzerdefiniertes Container-Image erstellen.

Gespeicherte Prozedur für Spark aufrufen

Nachdem Sie eine gespeicherte Prozedur erstellt haben, können Sie sie mit einer der folgenden Optionen aufrufen:

Console

  1. Rufen Sie die Seite BigQuery auf.

    BigQuery aufrufen

  2. Maximieren Sie im Bereich Explorer Ihr Projekt und wählen Sie die gespeicherte Prozedur für Spark aus, die Sie ausführen möchten.

  3. Klicken Sie im Fenster Gespeicherte Prozedurinformationen auf Gespeicherte Prozedur aufrufen. Alternativ können Sie die Option Aktionen anzeigen maximieren und auf Aufrufen klicken.

  4. Klicken Sie auf Ausführen.

  5. Klicken Sie im Abschnitt Alle Ergebnisse auf Ergebnisse ansehen.

  6. Optional: Führen Sie im Abschnitt Abfrageergebnisse die folgenden Schritte aus:

    • Klicken Sie auf Ausführungsdetails, um die Spark-Treiberlogs aufzurufen.

    • Wenn Sie in Cloud Logging Logs aufrufen möchten, klicken Sie auf Jobinformationen und klicken Sie dann im Feld Protokoll auf Log.

    • Wenn Sie den Endpunkt für den Spark History Server abrufen möchten, klicken Sie auf Jobinformationen und dann auf Spark-History Server.

SQL

Verwenden Sie zum Aufrufen einer gespeicherten Prozedur die Anweisung CALL PROCEDURE:

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Geben Sie im Abfrageeditor die folgende Anweisung ein:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. Klicken Sie auf Ausführen.

Informationen zum Ausführen von Abfragen finden Sie unter Interaktive Abfrage ausführen.

Benutzerdefiniertes Dienstkonto verwenden

Anstatt die Dienstidentität der Spark-Verbindung für den Datenzugriff zu verwenden, können Sie ein benutzerdefiniertes Dienstkonto für den Zugriff auf Daten in Ihrem Spark-Code verwenden.

Wenn Sie ein benutzerdefiniertes Dienstkonto verwenden möchten, geben Sie den Sicherheitsmodus INVOKER (mit der Anweisung EXTERNAL SECURITY INVOKER) beim Erstellen einer gespeicherten Spark-Prozedur an und geben Sie das Dienstkonto an, wenn Sie das Gespeicherte Prozedur.

Wenn Sie auf Spark-Code in Cloud Storage zugreifen und diesen verwenden möchten, müssen Sie der Dienst-ID der Spark-Verbindung die erforderlichen Berechtigungen erteilen. Sie müssen dem Dienstkonto der Verbindung die IAM-Berechtigung storage.objects.get oder die IAM-Rolle storage.objectViewer erteilen.

Optional können Sie dem Dienstkonto der Verbindung Zugriff auf Dataproc Metastore und Dataproc Persistent History Server gewähren, wenn Sie diese in der Verbindung angegeben haben. Weitere Informationen finden Sie unter Zugriff auf das Dienstkonto gewähren.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Optional können Sie dem vorherigen Code die folgenden Argumente hinzufügen:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Ersetzen Sie Folgendes:

  • CUSTOM_SERVICE_ACCOUNT: erforderlich. Ein von Ihnen bereitgestelltes benutzerdefiniertes Dienstkonto.
  • BUCKET_NAME: Optional. Der Cloud Storage-Bucket, der als Standarddateisystem der Spark-Anwendung verwendet wird. Wenn nicht angegeben, wird in Ihrem Projekt ein Cloud Storage-Standard-Bucket erstellt und der Bucket wird von allen Jobs gemeinsam genutzt, die unter demselben Projekt ausgeführt werden.
  • DATASET: Optional. Das Dataset zum Speichern der temporären Daten, die durch das Aufrufen der Prozedur erzeugt werden. Die Daten werden nach Abschluss des Jobs bereinigt. Wenn dies nicht angegeben ist, wird für den Job ein temporäres Standard-Dataset erstellt.

Ihr benutzerdefiniertes Dienstkonto muss die folgenden Berechtigungen haben:

  • So lesen und schreiben Sie in dem Staging-Bucket, der als Standarddateisystem der Spark-Anwendung verwendet wird:

    • Berechtigungen vom Typ storage.objects.* oder die IAM-Rolle roles/storage.objectAdmin für den von Ihnen angegebenen Staging-Bucket.
    • Außerdem die Berechtigungen storage.buckets.* oder die IAM-Rolle roles/storage.Admin für das Projekt, wenn der Staging-Bucket nicht angegeben ist.
  • (Optional) So lesen und schreiben Sie Daten aus und in BigQuery:

    • bigquery.tables.* für Ihre BigQuery-Tabellen.
    • bigquery.readsessions.* für Ihr Projekt
    • Die IAM-Rolle roles/bigquery.admin enthält die vorherigen Berechtigungen.
  • (Optional) So lesen und schreiben Sie Daten aus und in Cloud Storage:

    • storage.objects.*-Berechtigungen oder die IAM-Rolle roles/storage.objectAdmin für Ihre Cloud Storage-Objekte.
  • (Optional) So lesen und schreiben Sie in das Staging-Dataset, das für INOUT/OUT-Parameter verwendet wird:

    • IAM-Rolle bigquery.tables.* oder roles/bigquery.dataEditor für das von Ihnen angegebene Staging-Dataset.
    • Darüber hinaus die Berechtigung bigquery.datasets.create oder die IAM-Rolle roles/bigquery.dataEditor für das Projekt, wenn das Staging-Dataset nicht angegeben ist.

Beispiele für gespeicherte Prozeduren für Spark

In diesem Abschnitt wird anhand von Beispielen gezeigt, wie Sie ein gespeichertes Verfahren für Apache Spark erstellen.

PySpark- oder JAR-Datei in Cloud Storage verwenden

Das folgende Beispiel zeigt, wie Sie mithilfe der Verbindung my-project-id.us.my-connection und einer PySpark- oder JAR-Datei, die in einem Cloud Storage-Bucket gespeichert ist, eine gespeicherte Prozedur für Spark erstellen:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java oder Scala

Verwenden Sie main_file_uri, um eine gespeicherte Prozedur zu erstellen:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Verwenden Sie main_class, um eine gespeicherte Prozedur zu erstellen:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Inline-Code verwenden

Das folgende Beispiel zeigt, wie Sie eine gespeicherte Prozedur für Spark mithilfe der Verbindung my-project-id.us.my-connection und mit Inline-PySpark-Code erstellen:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Wert als Eingabeparameter übergeben

In den folgenden Beispielen werden die beiden Methoden zur Übergabe eines Werts als Eingabeparameter in Python gezeigt:

Methode 1: Umgebungsvariablen verwenden

Im PySpark-Code können Sie die Eingabeparameter der gespeicherten Prozedur für Spark über Umgebungsvariablen im Spark-Treiber und in den Executors abrufen. Der Name der Umgebungsvariablen hat das Format BIGQUERY_PROC_PARAM.PARAMETER_NAME, wobei PARAMETER_NAME der Name des Eingabeparameters ist. Wenn der Name des Eingabeparameters beispielsweise var lautet, lautet der Name der entsprechenden Umgebungsvariable BIGQUERY_PROC_PARAM.var. Die Eingabeparameter sind JSON-codiert. In Ihrem PySpark-Code können Sie den Wert des Eingabeparameters in einem JSON-String aus der Umgebungsvariablen abrufen und in eine Python-Variable decodieren.

Das folgende Beispiel zeigt, wie Sie den Wert eines Eingabeparameters vom Typ INT64 in Ihren PySpark-Code einfügen:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Methode 2: Integrierte Bibliothek verwenden

Im PySpark-Code können Sie einfach eine integrierte Bibliothek importieren und damit alle Parametertypen ausfüllen. Um die Parameter an Executors zu übergeben, füllen Sie die Parameter in einem Spark-Treiber als Python-Variablen aus und übergeben die Werte an Executors. Die integrierte Bibliothek unterstützt die meisten BigQuery-Datentypen mit Ausnahme von INTERVAL, GEOGRAPHY, NUMERIC und BIGNUMERIC.

BigQuery-Datentyp Python-Datentyp
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Nicht unterstützt
BIGNUMERIC Nicht unterstützt
INTERVAL Nicht unterstützt
GEOGRAPHY Nicht unterstützt

Das folgende Beispiel zeigt, wie Sie die integrierte Bibliothek importieren und dazu verwenden, einen Eingabeparameter vom Typ INT64 und einen Eingabeparameter vom Typ ARRAY<STRUCT<a INT64, b STRING>> in Ihrem PySpark-Code zu füllen:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

Im Java- oder Scala-Code können Sie die Eingabeparameter der gespeicherten Prozedur für Spark über Umgebungsvariablen im Spark-Treiber und in den Executors abrufen. Der Name der Umgebungsvariablen hat das Format BIGQUERY_PROC_PARAM.PARAMETER_NAME, wobei PARAMETER_NAME der Name des Eingabeparameters ist. Wenn der Name des Eingabeparameters beispielsweise var lautet, lautet der Name der entsprechenden Umgebungsvariable BIGQUERY_PROC_PARAM.var. In Ihrem Java- oder Scala-Code können Sie den Wert des Eingabeparameters aus der Umgebungsvariablen abrufen.

Im folgenden Beispiel wird gezeigt, wie der Wert eines Eingabeparameters aus Umgebungsvariablen in den Scala-Code übernommen wird:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

Das folgende Beispiel zeigt, wie Eingabeparameter aus Umgebungsvariablen in Ihren Java-Code eingefügt werden:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Werte als OUT- und INOUT-Parameter übergeben

Ausgabeparameter geben den Wert aus der Spark-Prozedur zurück, während der INOUT-Parameter einen Wert für die Prozedur akzeptiert und einen Wert aus der Prozedur zurückgibt. Um die OUT- und INOUT-Parameter zu verwenden, fügen Sie beim Erstellen der Spark-Prozedur das Schlüsselwort OUT oder INOUT vor dem Parameternamen ein. Im PySpark-Code verwenden Sie die integrierte Bibliothek, um einen Wert als OUT- oder INOUT-Parameter zurückzugeben. Wie die Eingabeparameter unterstützt die integrierte Bibliothek die meisten BigQuery-Datentypen mit Ausnahme von INTERVAL, GEOGRAPHY, NUMERIC und BIGNUMERIC. Die Werte des Typs TIME und DATETIME werden in die UTC-Zeitzone konvertiert, wenn sie als OUT- oder INOUT-Parameter zurückgegeben werden.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Aus einer Hive Metastore-Tabelle lesen und Ergebnisse in BigQuery schreiben

Das folgende Beispiel zeigt, wie Sie eine Hive Metastore-Tabelle transformieren und die Ergebnisse in BigQuery schreiben:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Logfilter aufrufen

Nachdem Sie eine gespeicherte Prozedur für Spark aufgerufen haben, können Sie sich die Loginformationen ansehen. Verwenden Sie den Befehl bq show, um die Filterinformationen zum Cloud Logging und zum Spark-Verlaufscluster abzurufen. Die Filterinformationen sind im Feld SparkStatistics des untergeordneten Jobs verfügbar. So rufen Sie Logfilter ab:

  1. Rufen Sie die Seite BigQuery auf.

    BigQuery aufrufen

  2. Listen Sie im Abfrageeditor untergeordnete Jobs des Skriptjobs der gespeicherten Prozedur auf:

    bq ls -j --parent_job_id=$parent_job_id

    Informationen zum Abrufen der Job-ID finden Sie unter Jobdetails ansehen.

    Die Ausgabe sieht in etwa so aus:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. Ermitteln Sie die jobId für Ihre gespeicherte Prozedur und rufen Sie mit dem Befehl bq show die Details des Jobs auf:

    bq show --format=prettyjson --job $child_job_id

    Kopieren Sie das Feld sparkStatistics, da Sie es in einem anderen Schritt benötigen.

    Die Ausgabe sieht in etwa so aus:

    {
    "configuration": {...}"statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. Für Logging generiert Log-Filter mit den Feldern SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    Die Logs werden in die überwachte Ressource bigquery.googleapis.com/SparkJob geschrieben. Die Logs sind mit den Komponenten INFO, DRIVER und EXECUTOR gekennzeichnet. Fügen Sie den Logfiltern die Komponente labels.component = "DRIVER" hinzu, um Logs aus dem Spark-Treiber zu filtern. Fügen Sie den Logfiltern die Komponente labels.component = "EXECUTOR" hinzu, um Logs aus dem Spark-Treiber zu filtern.

Vom Kunden verwalteten Verschlüsselungsschlüssel verwenden

Das BigQuery-Spark-Verfahren verwendet den vom Kunden verwalteten Verschlüsselungsschlüssel (CMEK), um Ihre Inhalte zusammen mit der von BigQuery bereitgestellten Standardverschlüsselung zu schützen. Wenn Sie den CMEK im Spark-Verfahren verwenden möchten, lösen Sie zuerst die Erstellung des BigQuery-Verschlüsselungsdienstkontos aus und gewähren Sie die erforderlichen Berechtigungen. Das Spark-Verfahren unterstützt auch die CMEK-Organisationsrichtlinien, wenn diese auf Ihr Projekt angewendet werden.

Wenn Ihre gespeicherte Prozedur den Sicherheitsmodus INVOKER verwendet, sollte Ihr CMEK beim Aufrufen der Prozedur über die SQL-Systemvariable angegeben werden. Andernfalls kann Ihr CMEK über die Verbindung angegeben werden, die der gespeicherten Prozedur zugeordnet ist.

Verwenden Sie den folgenden Beispielcode, um den CMEK über die Verbindung beim Erstellen einer gespeicherten Spark-Prozedur anzugeben:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Verwenden Sie den folgenden Beispielcode, um CMEK beim Aufrufen der Prozedur über die SQL-Systemvariable anzugeben:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

VPC Service Controls verwenden

Mit VPC Service Controls können Sie einen sicheren Perimeter einrichten, der vor Daten-Exfiltration schützt. Wenn Sie VPC Service Controls mit einer Spark-Prozedur für zusätzliche Sicherheit verwenden möchten, erstellen Sie zuerst einen Dienstperimeter.

Fügen Sie dem Dienstperimeter die folgenden APIs hinzu, um Ihre Spark-Prozedurjobs vollständig zu schützen:

  • BigQuery API (bigquery.googleapis.com)
  • Cloud Logging API (logging.googleapis.com)
  • Cloud Storage API (storage.googleapis.com), wenn Sie Cloud Storage verwenden
  • Artifact Registry API (artifactregistry.googleapis.com) oder Container Registry API (containerregistry.googleapis.com), wenn Sie einen benutzerdefinierten Container verwenden
  • Dataproc Metastore API (metastore.googleapis.com) und Cloud Run Admin API (run.googleapis.com), wenn Sie Dataproc Metastore verwenden

Fügen Sie das Abfrageprojekt der Spark-Prozedur dem Perimeter hinzu. Fügen Sie dem Perimeter weitere Projekte hinzu, die Ihren Spark-Code oder Ihre Daten hosten.

Best Practices

  • Wenn Sie zum ersten Mal eine Verbindung in Ihrem Projekt verwenden, dauert die Bereitstellung etwa eine Minute zusätzlich. Um Zeit zu sparen, können Sie eine vorhandene Spark-Verbindung wiederverwenden, wenn Sie eine gespeicherte Prozedur für Spark erstellen.

  • Wenn Sie eine Spark-Prozedur für die Produktion erstellen, empfiehlt Google, eine Laufzeitversion anzugeben. Eine Liste der unterstützten Laufzeitversionen finden Sie unter Serverlose Dataproc-Laufzeitversionen. Wir empfehlen, die LTS-Version zu verwenden.

  • Wenn Sie in einer Spark-Prozedur einen benutzerdefinierten Container angeben, empfehlen wir die Verwendung von Artifact Registry und Image-Streaming.

  • Für eine bessere Leistung können Sie im Spark-Verfahren Attribute für die Ressourcenzuweisung angeben. Gespeicherte Spark-Verfahren unterstützen eine Liste von Attributen für die Ressourcenzuweisung, die mit Dataproc Serverless identisch sind.

Beschränkungen

  • Sie können nur das gRPC-Endpunktprotokoll verwenden, um eine Verbindung zu Dataproc Metastore herzustellen. Andere Arten von Hive Metastore werden noch nicht unterstützt.
  • Vom Kunden verwaltete Verschlüsselungsschlüssel (CMEK) sind nur verfügbar, wenn Kunden Spark-Prozeduren für eine einzelne Region erstellen. Globale regionale CMEK-Schlüssel und multiregionale CMEK-Schlüssel wie EU oder US werden nicht unterstützt.
  • Die Übergabe von Ausgabeparametern wird nur für PySpark unterstützt.
  • Wenn das mit der gespeicherten Prozedur für Spark verknüpfte Dataset über eine regionenübergreifende Dataset-Replikation in eine Zielregion repliziert wird, kann die gespeicherte Prozedur nur in der Region abgefragt werden, die in der sie erstellt wurde.

Kontingente und Limits

Informationen zu Kontingenten und Limits finden Sie unter Kontingente und Limits für gespeicherte Prozeduren für Spark.

Nächste Schritte