Mit gespeicherten Prozeduren für Apache Spark arbeiten
Dieses Dokument richtet sich an Data Engineers, Data Scientists und Datenanalysten, die gespeicherte Prozeduren für Spark in BigQuery erstellen und aufrufen möchten.
Mit BigQuery können Sie gespeicherte Spark-Prozeduren erstellen und ausführen, die in Python, Java oder Scala geschrieben sind. Sie können diese gespeicherten Prozeduren dann in BigQuery mithilfe einer GoogleSQL-Abfrage ausführen, ähnlich wie beim Ausführen von gespeicherten SQL-Prozeduren.
Hinweise
Bitten Sie Ihren Administrator, eine Spark-Verbindung zu erstellen und für Sie freizugeben, damit Sie eine gespeicherte Prozedur für Spark erstellen können. Ihr Administrator muss dem Dienstkonto, das der Verbindung zugeordnet ist, auch die erforderlichen IAM-Berechtigungen (Identity and Access Management) erteilen.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erteilen, die Sie zum Ausführen der Aufgaben in diesem Dokument benötigen:
-
Gespeicherte Prozedur für Spark erstellen:
-
BigQuery-Dateneditor (
roles/bigquery.dataEditor
) für das Dataset, in dem Sie die gespeicherte Prozedur erstellen -
BigQuery Connection-Administrator (
roles/bigquery.connectionAdmin
) für die Verbindung, die vom gespeicherten Verfahren verwendet wird -
BigQuery-Jobnutzer (
roles/bigquery.jobUser
) für Ihr Projekt
-
BigQuery-Dateneditor (
-
Gespeicherte Prozedur für Apache Spark aufrufen:
-
BigQuery Metadata-Betrachter (
roles/bigquery.metadataViewer
) für das Dataset, in dem die gespeicherte Prozedur gespeichert ist -
BigQuery-Verbindungsnutzer (
roles/bigquery.connectionUser
) für die Verbindung -
BigQuery-Jobnutzer (
roles/bigquery.jobUser
) für Ihr Projekt
-
BigQuery Metadata-Betrachter (
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Diese vordefinierten Rollen enthalten die Berechtigungen, die zum Ausführen der Aufgaben in diesem Dokument erforderlich sind. Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:
Erforderliche Berechtigungen
Die folgenden Berechtigungen sind zum Ausführen der Aufgaben in diesem Dokument erforderlich:
-
Erstellen Sie eine Verbindung:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Gespeicherte Prozedur für Spark erstellen:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Gespeicherte Prozedur für Spark aufrufen:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.
Standortberücksichtigung
Sie müssen eine gespeicherte Prozedur für Spark am selben Standort wie Ihre Verbindung erstellen, da die gespeicherte Prozedur am selben Standort wie die Verbindung ausgeführt wird. Wenn Sie beispielsweise ein gespeichertes Verfahren am multiregionalen Standort "US" erstellen möchten, verwenden Sie eine Verbindung am multiregionalen Standort "US".
Preise
Die Kosten für das Ausführen von Spark-Prozeduren in BigQuery ähneln den Kosten für das Ausführen von Spark-Prozeduren in Dataproc Serverless. Weitere Informationen finden Sie unter Dataproc Serverless-Preise.
Spark-gespeicherte Prozeduren können sowohl mit dem On-Demand-Preismodell als auch mit allen BigQuery-Versionen verwendet werden. Spark-Verfahren werden in allen Fällen nach dem Pay-as-you-go-Modell der BigQuery Enterprise-Version abgerechnet, unabhängig vom in Ihrem Projekt verwendeten Preismodell für die Berechnung.
Spark-gespeicherte Prozeduren für BigQuery unterstützen keine Reservierungen oder Zusicherungen. Vorhandene Reservierungen und Zusicherungen werden weiterhin für andere unterstützte Abfragen und Verfahren verwendet. Die Gebühren für die Nutzung gespeicherter Spark-Verfahren werden Ihrer Rechnung in der Enterprise-Version hinzugefügt (Pay-as-you-go-Kosten). Sofern zutreffend, werden die Rabatte Ihrer Organisation angewendet.
Für Spark-gespeicherte Prozeduren wird zwar eine Spark-Ausführungs-Engine verwendet, aber es fallen keine separaten Gebühren für die Spark-Ausführung an. Wie bereits erwähnt, werden die entsprechenden Kosten als Pay-as-you-go-SKU für die BigQuery Enterprise Edition ausgewiesen.
Für Spark-gespeicherte Prozeduren gibt es keine kostenlose Stufe.
Gespeicherte Prozedur für Spark erstellen
Sie müssen die gespeicherte Prozedur am selben Speicherort wie die von Ihnen verwendete Verbindung erstellen.
Wenn der Text Ihrer gespeicherten Prozedur mehr als 1 MB beträgt, empfehlen wir, die gespeicherte Prozedur in einer Datei in einem Cloud Storage-Bucket zu speichern, anstatt Inline-Code zu verwenden. BigQuery bietet zwei Methoden zum Erstellen einer gespeicherten Prozedur für Spark mit Python:
- Wenn Sie die Anweisung
CREATE PROCEDURE
verwenden möchten, verwenden Sie den SQL-Abfrageeditor. - Wenn Sie Python-Code direkt eingeben möchten, verwenden Sie den PySpark-Editor. Sie können den Code als gespeicherte Prozedur speichern.
SQL-Abfrage-Editor verwenden
So erstellen Sie eine gespeicherte Prozedur für Spark im SQL-Abfrageeditor:
Rufen Sie die Seite BigQuery auf.
Fügen Sie im Abfrageeditor den Beispielcode für die angezeigte
CREATE PROCEDURE
-Anweisung hinzu.Alternativ können Sie im Bereich Explorer auf die Verbindung in dem Projekt klicken, in dem Sie die Verbindungsressource erstellt haben. Zum Erstellen einer gespeicherten Prozedur für Spark klicken Sie dann auf
Gespeicherte Prozedur erstellen.Python
Verwenden Sie das folgende Codebeispiel, um gespeicherte Prozeduren für Spark in Python zu erstellen:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java oder Scala
Verwenden Sie den folgenden Code, um eine gespeicherte Prozedur für Spark in Java oder Scala mit der Option
main_file_uri
zu erstellen:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
Verwenden Sie den folgenden Code, um eine gespeicherte Prozedur für Spark in Java oder Scala mit den Optionen
main_class
undjar_uris
zu erstellen:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
Ersetzen Sie Folgendes:
PROJECT_ID
: das Projekt, in dem Sie das gespeicherte Verfahren erstellen möchten, z. B.myproject
.DATASET
: Das Dataset, in dem Sie die gespeicherte Prozedur erstellen möchten, z. B.mydataset
.PROCEDURE_NAME
: der Name der gespeicherten Prozedur, die Sie in BigQuery ausführen möchten, z. B.mysparkprocedure
.PROCEDURE_ARGUMENT
: ein Parameter zur Eingabe der Eingabeargumente.Geben Sie in diesem Parameter die folgenden Felder an:
ARGUMENT_MODE
: der Modus des Arguments.Zulässige Werte sind z. B.
IN
,OUT
undINOUT
. Der Standardwert istIN
.ARGUMENT_NAME
: der Name des Arguments.ARGUMENT_TYPE
: der Typ des Arguments.
Beispiel:
myproject.mydataset.mysparkproc(num INT64)
.Weitere Informationen finden Sie in den Abschnitten zum Übergeben eines Werts als
IN
-Parameter oderOUT
- undINOUT
-Parameter in diesem Dokument.CONNECTION_PROJECT_ID
: das Projekt, das die Verbindung zum Ausführen des Spark-Verfahrens enthält.CONNECTION_REGION
: die Region, die die Verbindung zum Ausführen des Spark-Verfahrens enthält, z. B.us
.CONNECTION_ID
: die Verbindungs-ID, z. B.myconnection
.Wenn Sie sich Verbindungsdetails in der Google Cloud -Konsole ansehen, ist die Verbindungs-ID der Wert im letzten Abschnitt der voll qualifizierten Verbindungs-ID, der unter Verbindungs-ID angezeigt wird, z. B.
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: die Laufzeitversion von Spark, z. B.1.1
.MAIN_PYTHON_FILE_URI
: der Pfad zu einer zweiten Datei, z. B.gs://mybucket/mypysparkmain.py
.Wenn Sie den Text der gespeicherten Prozedur in der Anweisung
CREATE PROCEDURE
hinzufügen möchten, können Sie alternativPYSPARK_CODE
nachLANGUAGE PYTHON AS
hinzufügen, wie im Beispiel unter Inline-Code verwenden in diesem Dokument gezeigt.PYSPARK_CODE
: die Definition einer PySpark-Anwendung in der AnweisungCREATE PROCEDURE
, wenn Sie den Text der Prozedur inline übergeben möchtenDer Wert ist ein Stringliteral. Wenn der Code Anführungszeichen und umgekehrte Schrägstriche enthält, müssen diese entweder mit Escapezeichen versehen oder als Rohstring dargestellt werden. Beispielsweise kann der Code
"\n";
als einer der folgenden Werte dargestellt werden:- String in Anführungszeichen
"return \"\\n\";"
Sowohl Anführungszeichen als auch umgekehrte Schrägstriche werden maskiert. - String in drei Anführungszeichen:
"""return "\\n";"""
. Umgekehrte Schrägstriche werden maskiert, Anführungszeichen nicht. - Rohstring:
r"""return "\n";"""
. Es ist keine Maskierung erforderlich.
- String in Anführungszeichen
MAIN_JAR_URI
: der Pfad zur JAR-Datei, die diemain
-Klasse enthält, z. B.gs://mybucket/my_main.jar
.CLASS_NAME
: der voll qualifizierte Name einer Klasse in einem JAR-Satz mit der Optionjar_uris
, z. B.com.example.wordcount
URI
: der Pfad zur JAR-Datei, die die in dermain
-Klasse angegebene Klasse enthält, z. B.gs://mybucket/mypysparkmain.jar
.
Weitere Optionen, die Sie in
OPTIONS
angeben können, finden Sie in der Liste der Verfahrensoptionen.
PySpark-Editor verwenden
Wenn Sie eine Prozedur mit dem PySpark-Editor erstellen, müssen Sie die Anweisung CREATE PROCEDURE
nicht verwenden. Fügen Sie den Python-Code stattdessen direkt in den PySpark-Editor ein und speichern oder führen Sie ihn aus.
So erstellen Sie eine gespeicherte Prozedur für Spark im PySpark-Editor:
Rufen Sie die Seite BigQuery auf.
Wenn Sie den PySpark-Code direkt eingeben möchten, öffnen Sie den PySpark-Editor. Klicken Sie zum Öffnen des PySpark-Editors auf das Menü
neben SQL-Abfrage erstellen und wählen Sie PySpark-Prozedur erstellen aus.Klicken Sie auf Mehr > PySpark-Optionen und gehen Sie dann so vor:
Geben Sie den Speicherort an, an dem Sie den PySpark-Code ausführen möchten.
Geben Sie im Feld Verbindung die Spark-Verbindung an.
Geben Sie im Abschnitt Aufruf der gespeicherten Prozedur das Dataset an, in dem die generierten temporären gespeicherten Prozeduren gespeichert werden sollen. Sie können entweder ein bestimmtes Dataset festlegen oder die Verwendung eines temporären Datasets zulassen, um den PySpark-Code aufzurufen.
Das temporäre Dataset wird mit dem im vorherigen Schritt angegebenen Speicherort generiert. Wenn ein Dataset-Name angegeben ist, müssen sich das Dataset und die Spark-Verbindung am selben Speicherort befinden.
Definieren Sie im Abschnitt Parameter Parameter für die gespeicherte Prozedur. Der Wert des Parameters wird nur während der Ausführung des PySpark-Codes innerhalb von Sitzungen verwendet, aber die Deklaration selbst wird in der Prozedur gespeichert.
Geben Sie im Abschnitt Erweiterte Optionen die Prozeduroptionen an. Eine detaillierte Liste der Prozeduroptionen finden Sie in der Liste der Prozeduroptionen.
Fügen Sie im Bereich Attribute die Schlüssel/Wert-Paare hinzu, um den Job zu konfigurieren. Sie können beliebige Schlüssel/Wert-Paare aus den Dataproc Serverless Spark-Properties verwenden.
Geben Sie unter Dienstkontoeinstellungen das benutzerdefinierte Dienstkonto, CMEK, das Staging-Dataset und den Staging-Cloud Storage-Ordner an, die während der Sitzungsausführungen des PySpark-Codes verwendet werden sollen.
Klicken Sie auf Speichern.
Gespeicherte Prozedur für Spark speichern
Nachdem Sie mit dem PySpark-Editor die gespeicherte Prozedur erstellt haben, können Sie die gespeicherte Prozedur speichern. Führen Sie dazu die folgenden Schritte aus:
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Erstellen Sie im Abfrageeditor eine gespeicherte Prozedur für Spark mithilfe von Python mit dem PySpark-Editor.
Klicken Sie auf Speichern > Prozedur speichern.
Geben Sie im Dialogfeld Gespeicherte Prozedur speichern den Namen des Datasets, in dem Sie die gespeicherte Prozedur speichern möchten, sowie den Namen der gespeicherten Prozedur an.
Klicken Sie auf Speichern.
Wenn Sie den PySpark-Code nur ausführen und nicht als gespeicherte Prozedur speichern möchten, klicken Sie auf Ausführen anstelle von Speichern.
Benutzerdefinierte Container verwenden
Der benutzerdefinierte Container bietet die Laufzeitumgebung für die Treiber- und Executor-Prozesse der Arbeitslast. Verwenden Sie den folgenden Beispielcode, um benutzerdefinierte Container zu verwenden:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Ersetzen Sie Folgendes:
PROJECT_ID
: das Projekt, in dem Sie das gespeicherte Verfahren erstellen möchten, z. B.myproject
.DATASET
: Das Dataset, in dem Sie die gespeicherte Prozedur erstellen möchten, z. B.mydataset
.PROCEDURE_NAME
: der Name der gespeicherten Prozedur, die Sie in BigQuery ausführen möchten, z. B.mysparkprocedure
.PROCEDURE_ARGUMENT
: ein Parameter zur Eingabe der Eingabeargumente.Geben Sie in diesem Parameter die folgenden Felder an:
ARGUMENT_MODE
: der Modus des Arguments.Zulässige Werte sind z. B.
IN
,OUT
undINOUT
. Der Standardwert istIN
.ARGUMENT_NAME
: der Name des Arguments.ARGUMENT_TYPE
: der Typ des Arguments.
Beispiel:
myproject.mydataset.mysparkproc(num INT64)
.Weitere Informationen finden Sie in den Abschnitten zum Übergeben eines Werts als
IN
-Parameter oderOUT
- undINOUT
-Parameter in diesem Dokument.CONNECTION_PROJECT_ID
: das Projekt, das die Verbindung zum Ausführen des Spark-Verfahrens enthält.CONNECTION_REGION
: die Region, die die Verbindung zum Ausführen des Spark-Verfahrens enthält, z. B.us
.CONNECTION_ID
: die Verbindungs-ID, z. B.myconnection
.Wenn Sie sich Verbindungsdetails in der Google Cloud -Konsole ansehen, ist die Verbindungs-ID der Wert im letzten Abschnitt der voll qualifizierten Verbindungs-ID, der unter Verbindungs-ID angezeigt wird, z. B.
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: die Laufzeitversion von Spark, z. B.1.1
.MAIN_PYTHON_FILE_URI
: der Pfad zu einer zweiten Datei, z. B.gs://mybucket/mypysparkmain.py
.Wenn Sie den Text der gespeicherten Prozedur in der Anweisung
CREATE PROCEDURE
hinzufügen möchten, können Sie alternativPYSPARK_CODE
nachLANGUAGE PYTHON AS
hinzufügen, wie im Beispiel unter Inline-Code verwenden in diesem Dokument gezeigt.PYSPARK_CODE
: die Definition einer PySpark-Anwendung in der AnweisungCREATE PROCEDURE
, wenn Sie den Text der Prozedur inline übergeben möchtenDer Wert ist ein Stringliteral. Wenn der Code Anführungszeichen und umgekehrte Schrägstriche enthält, müssen diese entweder mit Escapezeichen versehen oder als Rohstring dargestellt werden. Beispielsweise kann der Code
"\n";
als einer der folgenden Werte dargestellt werden:- String in Anführungszeichen
"return \"\\n\";"
Sowohl Anführungszeichen als auch umgekehrte Schrägstriche werden maskiert. - String in drei Anführungszeichen:
"""return "\\n";"""
. Umgekehrte Schrägstriche werden maskiert, Anführungszeichen nicht. - Rohstring:
r"""return "\n";"""
. Es ist keine Maskierung erforderlich.
- String in Anführungszeichen
CONTAINER_IMAGE
: Pfad des Bildes in der Artifact Registry. Sie darf nur Bibliotheken enthalten, die in Ihrem Verfahren verwendet werden. Wenn keine Angabe erfolgt, wird das Container-Image des Systems verwendet, das mit der Laufzeitversion verknüpft ist.
Weitere Informationen zum Erstellen eines benutzerdefinierten Container-Images mit Spark finden Sie unter Benutzerdefiniertes Container-Image erstellen.
Gespeicherte Prozedur für Spark aufrufen
Nachdem Sie eine gespeicherte Prozedur erstellt haben, können Sie sie mit einer der folgenden Optionen aufrufen:
Console
Rufen Sie die Seite BigQuery auf.
Maximieren Sie im Bereich Explorer Ihr Projekt und wählen Sie die gespeicherte Prozedur für Spark aus, die Sie ausführen möchten.
Klicken Sie im Fenster Gespeicherte Prozedurinformationen auf Gespeicherte Prozedur aufrufen. Alternativ können Sie die Option Aktionen anzeigen maximieren und auf Aufrufen klicken.
Klicken Sie auf Ausführen.
Klicken Sie im Abschnitt Alle Ergebnisse auf Ergebnisse ansehen.
Optional: Führen Sie im Abschnitt Abfrageergebnisse die folgenden Schritte aus:
Klicken Sie auf Ausführungsdetails, um die Spark-Treiberlogs aufzurufen.
Wenn Sie in Cloud Logging Logs aufrufen möchten, klicken Sie auf Jobinformationen und klicken Sie dann im Feld Protokoll auf Log.
Wenn Sie den Endpunkt für den Spark History Server abrufen möchten, klicken Sie auf Jobinformationen und dann auf Spark-History Server.
SQL
Verwenden Sie zum Aufrufen einer gespeicherten Prozedur die Anweisung CALL PROCEDURE
:
Öffnen Sie in der Google Cloud Console die Seite BigQuery.
Geben Sie im Abfrageeditor die folgende Anweisung ein:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Klicken Sie auf
Ausführen.
Informationen zum Ausführen von Abfragen finden Sie unter Interaktive Abfrage ausführen.
Benutzerdefiniertes Dienstkonto verwenden
Anstatt die Dienstidentität der Spark-Verbindung für den Datenzugriff zu verwenden, können Sie ein benutzerdefiniertes Dienstkonto für den Zugriff auf Daten in Ihrem Spark-Code verwenden.
Wenn Sie ein benutzerdefiniertes Dienstkonto verwenden möchten, geben Sie beim Erstellen einer Spark-Speicherprozedur den Sicherheitsmodus INVOKER
(mit der Anweisung EXTERNAL SECURITY INVOKER
) und beim Aufrufen der gespeicherten Prozedur das Dienstkonto an.
Wenn Sie auf Spark-Code aus Cloud Storage zugreifen und ihn verwenden möchten, müssen Sie der Dienstidentität der Spark-Verbindung die erforderlichen Berechtigungen erteilen. Sie müssen dem Dienstkonto der Verbindung die IAM-Berechtigung storage.objects.get
oder die IAM-Rolle storage.objectViewer
zuweisen.
Optional können Sie dem Dienstkonto der Verbindung Zugriff auf den Dataproc Metastore und den Dataproc Persistent History Server gewähren, sofern Sie diese in der Verbindung angegeben haben. Weitere Informationen finden Sie unter Zugriff auf das Dienstkonto gewähren.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Optional können Sie dem vorherigen Code die folgenden Argumente hinzufügen:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
Ersetzen Sie Folgendes:
CUSTOM_SERVICE_ACCOUNT
: erforderlich. Ein von Ihnen bereitgestelltes benutzerdefiniertes Dienstkonto.BUCKET_NAME
: Optional. Der Cloud Storage-Bucket, der als Standarddateisystem der Spark-Anwendung verwendet wird. Wenn Sie diese Angabe nicht machen, wird in Ihrem Projekt ein Cloud Storage-Standard-Bucket erstellt. Dieser Bucket wird von allen Jobs freigegeben, die im selben Projekt ausgeführt werden.DATASET
: Optional. Das Dataset, in dem die temporären Daten gespeichert werden, die durch den Aufruf der Prozedur generiert werden. Die Daten werden nach Abschluss des Jobs bereinigt. Wenn keine Angabe erfolgt, wird für den Job ein standardmäßiger temporärer Datensatz erstellt.
Ihr benutzerdefiniertes Dienstkonto muss die folgenden Berechtigungen haben:
So lesen und schreiben Sie im Staging-Bucket, der als Standarddateisystem der Spark-Anwendung verwendet wird:
storage.objects.*
-Berechtigungen oder die IAM-Rolleroles/storage.objectAdmin
für den angegebenen Staging-Bucket- Außerdem die
storage.buckets.*
-Berechtigungen oder dieroles/storage.Admin
-IAM-Rolle für das Projekt, wenn der Staging-Bucket nicht angegeben ist.
Optional: So lesen und schreiben Sie Daten von und nach BigQuery:
bigquery.tables.*
für Ihre BigQuery-Tabellen.bigquery.readsessions.*
für Ihr Projekt- Die IAM-Rolle
roles/bigquery.admin
enthält die vorherigen Berechtigungen.
Optional: So lesen und schreiben Sie Daten von und zu Cloud Storage:
storage.objects.*
-Berechtigungen oder die IAM-Rolleroles/storage.objectAdmin
für Ihre Cloud Storage-Objekte
Optional: So lesen und schreiben Sie im Staging-Dataset, das für
INOUT/OUT
-Parameter verwendet wird:- IAM-Rolle
bigquery.tables.*
oderroles/bigquery.dataEditor
für das von Ihnen angegebene Staging-Dataset. - Außerdem die Berechtigung
bigquery.datasets.create
oder die IAM-Rolleroles/bigquery.dataEditor
für das Projekt, wenn das Staging-Dataset nicht angegeben ist.
- IAM-Rolle
Beispiele für gespeicherte Prozeduren für Spark
In diesem Abschnitt wird anhand von Beispielen gezeigt, wie Sie ein gespeichertes Verfahren für Apache Spark erstellen.
PySpark- oder JAR-Datei in Cloud Storage verwenden
Das folgende Beispiel zeigt, wie Sie mithilfe der Verbindung my-project-id.us.my-connection
und einer PySpark- oder JAR-Datei, die in einem Cloud Storage-Bucket gespeichert ist, eine gespeicherte Prozedur für Spark erstellen:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java oder Scala
Verwenden Sie main_file_uri
, um eine gespeicherte Prozedur zu erstellen:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
Verwenden Sie main_class
, um eine gespeicherte Prozedur zu erstellen:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
Inline-Code verwenden
Das folgende Beispiel zeigt, wie Sie eine gespeicherte Prozedur für Spark mithilfe der Verbindung my-project-id.us.my-connection
und mit Inline-PySpark-Code erstellen:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Wert als Eingabeparameter übergeben
In den folgenden Beispielen werden die beiden Methoden zur Übergabe eines Werts als Eingabeparameter in Python gezeigt:
Methode 1: Umgebungsvariablen verwenden
Im PySpark-Code können Sie die Eingabeparameter der gespeicherten Prozedur für Spark über Umgebungsvariablen im Spark-Treiber und in den Executors abrufen. Der Name der Umgebungsvariablen hat das Format BIGQUERY_PROC_PARAM.PARAMETER_NAME
, wobei PARAMETER_NAME
der Name des Eingabeparameters ist. Wenn der Name des Eingabeparameters beispielsweise var
lautet, lautet der Name der entsprechenden Umgebungsvariable BIGQUERY_PROC_PARAM.var
. Die Eingabeparameter sind JSON-codiert.
In Ihrem PySpark-Code können Sie den Wert des Eingabeparameters in einem JSON-String aus der Umgebungsvariablen abrufen und in eine Python-Variable decodieren.
Das folgende Beispiel zeigt, wie Sie den Wert eines Eingabeparameters vom Typ INT64
in Ihren PySpark-Code einfügen:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
Methode 2: Integrierte Bibliothek verwenden
Im PySpark-Code können Sie einfach eine integrierte Bibliothek importieren und damit alle Parametertypen ausfüllen. Um die Parameter an Executors zu übergeben, füllen Sie die Parameter in einem Spark-Treiber als Python-Variablen aus und übergeben die Werte an Executors. Die integrierte Bibliothek unterstützt die meisten BigQuery-Datentypen mit Ausnahme von INTERVAL
, GEOGRAPHY
, NUMERIC
und BIGNUMERIC
.
BigQuery-Datentyp | Python-Datentyp |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
Nicht unterstützt |
BIGNUMERIC
|
Nicht unterstützt |
INTERVAL
|
Nicht unterstützt |
GEOGRAPHY
|
Nicht unterstützt |
Das folgende Beispiel zeigt, wie Sie die integrierte Bibliothek importieren und dazu verwenden, einen Eingabeparameter vom Typ INT64 und einen Eingabeparameter vom Typ ARRAY<STRUCT<a INT64, b STRING>> in Ihrem PySpark-Code zu füllen:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
Im Java- oder Scala-Code können Sie die Eingabeparameter der gespeicherten Prozedur für Spark über Umgebungsvariablen im Spark-Treiber und in den Executors abrufen. Der Name der Umgebungsvariablen hat das Format BIGQUERY_PROC_PARAM.PARAMETER_NAME
, wobei PARAMETER_NAME
der Name des Eingabeparameters ist. Wenn der Name des Eingabeparameters beispielsweise var lautet, lautet der Name der entsprechenden Umgebungsvariable BIGQUERY_PROC_PARAM.var
.
In Ihrem Java- oder Scala-Code können Sie den Wert des Eingabeparameters aus der Umgebungsvariablen abrufen.
Im folgenden Beispiel wird gezeigt, wie der Wert eines Eingabeparameters aus Umgebungsvariablen in den Scala-Code übernommen wird:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
Das folgende Beispiel zeigt, wie Eingabeparameter aus Umgebungsvariablen in Ihren Java-Code eingefügt werden:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
Werte als OUT
- und INOUT
-Parameter übergeben
Ausgabeparameter geben den Wert aus der Spark-Prozedur zurück, während der INOUT
-Parameter einen Wert für die Prozedur akzeptiert und einen Wert aus der Prozedur zurückgibt.
Um die OUT
- und INOUT
-Parameter zu verwenden, fügen Sie beim Erstellen der Spark-Prozedur das Schlüsselwort OUT
oder INOUT
vor dem Parameternamen ein. Im PySpark-Code verwenden Sie die integrierte Bibliothek, um einen Wert als OUT
- oder INOUT
-Parameter zurückzugeben. Wie die Eingabeparameter unterstützt die integrierte Bibliothek die meisten BigQuery-Datentypen mit Ausnahme von INTERVAL
, GEOGRAPHY
, NUMERIC
und BIGNUMERIC
. Die Werte des Typs TIME
und DATETIME
werden in die UTC-Zeitzone konvertiert, wenn sie als OUT
- oder INOUT
-Parameter zurückgegeben werden.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Aus einer Hive Metastore-Tabelle lesen und Ergebnisse in BigQuery schreiben
Das folgende Beispiel zeigt, wie Sie eine Hive Metastore-Tabelle transformieren und die Ergebnisse in BigQuery schreiben:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
Logfilter aufrufen
Nachdem Sie eine gespeicherte Prozedur für Spark aufgerufen haben, können Sie sich die Loginformationen ansehen. Verwenden Sie den Befehl bq
show
, um die Filterinformationen zum Cloud Logging und zum Spark-Verlaufscluster abzurufen.
Die Filterinformationen sind im Feld SparkStatistics
des untergeordneten Jobs verfügbar. So rufen Sie Logfilter ab:
Rufen Sie die Seite BigQuery auf.
Listen Sie im Abfrageeditor untergeordnete Jobs des Skriptjobs der gespeicherten Prozedur auf:
bq ls -j --parent_job_id=$parent_job_id
Informationen zum Abrufen der Job-ID finden Sie unter Jobdetails ansehen.
Die Ausgabe sieht in etwa so aus:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Ermitteln Sie die
jobId
für Ihre gespeicherte Prozedur und rufen Sie mit dem Befehlbq show
die Details des Jobs auf:bq show --format=prettyjson --job $child_job_id
Kopieren Sie das Feld
sparkStatistics
, da Sie es in einem anderen Schritt benötigen.Die Ausgabe sieht in etwa so aus:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
Für Logging generiert Log-Filter mit den Feldern
SparkStatistics
:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
Die Logs werden in die überwachte Ressource
bigquery.googleapis.com/SparkJob
geschrieben. Die Logs sind mit den KomponentenINFO
,DRIVER
undEXECUTOR
gekennzeichnet. Fügen Sie den Logfiltern die Komponentelabels.component = "DRIVER"
hinzu, um Logs aus dem Spark-Treiber zu filtern. Fügen Sie den Logfiltern die Komponentelabels.component = "EXECUTOR"
hinzu, um Logs aus dem Spark-Treiber zu filtern.
Kundenverwalteten Verschlüsselungsschlüssel verwenden
Bei der BigQuery-Spark-Prozedur werden Ihre Inhalte mit dem vom Kunden verwalteten Verschlüsselungsschlüssel (CMEK) und der Standardverschlüsselung von BigQuery geschützt. Wenn Sie CMEK in der Spark-Prozedur verwenden möchten, müssen Sie zuerst das BigQuery-Verschlüsselungs-Dienstkonto erstellen und die erforderlichen Berechtigungen gewähren. Das Spark-Verfahren unterstützt auch die CMEK-Organisationsrichtlinien, sofern sie auf Ihr Projekt angewendet werden.
Wenn in Ihrer gespeicherten Prozedur der Sicherheitsmodus INVOKER
verwendet wird, sollte Ihr CMEK beim Aufrufen der Prozedur über die SQL-Systemvariable angegeben werden. Andernfalls kann der CMEK über die mit der gespeicherten Prozedur verknüpfte Verbindung angegeben werden.
Wenn Sie beim Erstellen einer Spark-Speicherprozedur die CMEK über die Verbindung angeben möchten, verwenden Sie den folgenden Beispielcode:
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
Wenn Sie CMEK beim Aufrufen der Prozedur über die SQL-Systemvariable angeben möchten, verwenden Sie den folgenden Beispielcode:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
VPC Service Controls verwenden
Mit VPC Service Controls können Sie einen sicheren Perimeter einrichten, der vor Daten-Exfiltration schützt. Wenn Sie VPC Service Controls mit einem Spark-Verfahren verwenden möchten, um die Sicherheit zu erhöhen, erstellen Sie zuerst einen Dienstperimeter.
Zum vollständigen Schutz Ihrer Spark-Verfahrensjobs fügen Sie dem Dienstperimeter die folgenden APIs hinzu:
- BigQuery API (
bigquery.googleapis.com
) - Cloud Logging API (
logging.googleapis.com
) - Cloud Storage API (
storage.googleapis.com
), wenn Sie Cloud Storage verwenden - Artifact Registry API (
artifactregistry.googleapis.com
) oder Container Registry API (containerregistry.googleapis.com
), wenn Sie einen benutzerdefinierten Container verwenden - Dataproc Metastore API (
metastore.googleapis.com
) und Cloud Run Admin API (run.googleapis.com
), wenn Sie Dataproc Metastore verwenden
Fügen Sie das Abfrageprojekt des Spark-Verfahrens dem Perimeter hinzu. Fügen Sie dem Perimeter weitere Projekte hinzu, in denen Ihr Spark-Code oder Ihre Daten gehostet werden.
Best Practices
Wenn Sie zum ersten Mal eine Verbindung in Ihrem Projekt verwenden, dauert die Bereitstellung etwa eine Minute zusätzlich. Um Zeit zu sparen, können Sie eine vorhandene Spark-Verbindung wiederverwenden, wenn Sie eine gespeicherte Prozedur für Spark erstellen.
Wenn Sie ein Spark-Verfahren für die Produktion erstellen, empfiehlt Google, eine Laufzeitversion anzugeben. Eine Liste der unterstützten Laufzeitversionen finden Sie unter Dataproc Serverless-Laufzeitversionen. Wir empfehlen die Verwendung der LTS-Version (Long-Term-Support).
Wenn Sie in einem Spark-Verfahren einen benutzerdefinierten Container angeben, empfehlen wir die Verwendung von Artifact Registry und Image-Streaming.
Für eine bessere Leistung können Sie im Spark-Verfahren Ressourcenzuweisungseigenschaften angeben. Spark-Speicherprozeduren unterstützen eine Liste von Ressourcenzuweisungseigenschaften, die mit Dataproc Serverless identisch ist.
Beschränkungen
- Sie können nur das gRPC-Endpunktprotokoll verwenden, um eine Verbindung zu Dataproc Metastore herzustellen. Andere Arten von Hive Metastore werden noch nicht unterstützt.
- Vom Kunden verwaltete Verschlüsselungsschlüssel (CMEK) sind nur verfügbar, wenn Kunden Spark-Vorgänge für eine einzelne Region erstellen. CMEK-Schlüssel für globale Regionen und CMEK-Schlüssel für mehrere Regionen wie
EU
oderUS
werden nicht unterstützt. - Die Übergabe von Ausgabeparametern wird nur für PySpark unterstützt.
- Wenn das mit der gespeicherten Prozedur für Spark verknüpfte Dataset über eine regionenübergreifende Dataset-Replikation in eine Zielregion repliziert wird, kann die gespeicherte Prozedur nur in der Region abgefragt werden, die in der sie erstellt wurde.
- Spark unterstützt keinen Zugriff auf HTTP-Endpunkte in Ihrem privaten VPC Service Controls-Netzwerk.
Kontingente und Limits
Informationen zu Kontingenten und Limits finden Sie unter Kontingente und Limits für gespeicherte Prozeduren für Spark.
Nächste Schritte
- Gespeicherte Prozedur ansehen
- Gespeicherte Prozedur löschen
- Mit einer gespeicherten SQL-Prozedur arbeiten