Bekerja dengan prosedur tersimpan untuk Apache Spark
Dokumen ini ditujukan bagi data engineer, data scientist, dan analis data untuk membuat dan memanggil prosedur tersimpan untuk Spark di BigQuery.
Dengan BigQuery, Anda dapat membuat dan menjalankan prosedur tersimpan Spark yang ditulis dalam Python, Java, dan Scala. Anda kemudian dapat menjalankan prosedur tersimpan ini di BigQuery menggunakan kueri GoogleSQL, mirip seperti menjalankan prosedur tersimpan SQL.
Sebelum memulai
Untuk membuat prosedur tersimpan Spark, minta administrator Anda untuk membuat koneksi Spark dan membagikannya kepada Anda. Administrator juga harus memberikan izin Identity and Access Management (IAM) yang diperlukan untuk akun layanan yang terkait dengan koneksi.
Peran yang diperlukan
Untuk mendapatkan izin yang Anda perlukan untuk melakukan tugas dalam dokumen ini, minta administrator Anda untuk memberikan peran IAM berikut:
-
Buat prosedur tersimpan untuk Spark:
-
BigQuery Data Editor (
roles/bigquery.dataEditor
) di set data tempat Anda membuat prosedur tersimpan -
Admin Koneksi BigQuery (
roles/bigquery.connectionAdmin
) di koneksi yang digunakan prosedur tersimpan -
BigQuery Job User (
roles/bigquery.jobUser
) di project Anda
-
BigQuery Data Editor (
-
Memanggil prosedur tersimpan untuk Spark:
-
BigQuery Metadata Viewer (
roles/bigquery.metadataViewer
) pada set data tempat prosedur disimpan -
BigQuery Connection User (
roles/bigquery.connectionUser
) pada koneksi -
BigQuery Job User (
roles/bigquery.jobUser
) di project Anda
-
BigQuery Metadata Viewer (
Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses.
Peran yang telah ditetapkan ini berisi izin yang diperlukan untuk melakukan tugas dalam dokumen ini. Untuk melihat izin yang benar-benar diperlukan, luaskan bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk melakukan tugas dalam dokumen ini:
-
Buat koneksi:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Buat prosedur tersimpan untuk Spark:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Panggil prosedur tersimpan untuk Spark:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
Anda juga dapat mendapatkan izin-izin ini dengan peran khusus atau peran yang telah ditetapkan sebelumnya.
Pertimbangan lokasi
Anda harus membuat prosedur tersimpan untuk Spark di lokasi yang sama dengan koneksi Anda karena prosedur tersimpan berjalan di lokasi yang sama dengan koneksi Anda. Misalnya, untuk membuat prosedur tersimpan di multi-region AS, gunakan koneksi yang berada di multi-region AS.
Harga
Biaya untuk menjalankan prosedur Spark di BigQuery serupa dengan biaya untuk menjalankan prosedur Spark pada Dataproc Serverless. Untuk mengetahui informasi selengkapnya, lihat harga Dataproc Serverless.
Prosedur yang disimpan di Spark dapat digunakan dengan model harga on demand serta dengan edisi BigQuery. Prosedur Spark dikenai biaya menggunakan model bayar sesuai penggunaan edisi BigQuery Enterprise dalam semua kasus, terlepas dari model harga komputasi yang digunakan dalam project Anda.
Prosedur yang disimpan oleh Spark untuk BigQuery tidak mendukung penggunaan reservasi atau komitmen. Reservasi dan komitmen yang ada akan terus digunakan untuk kueri dan prosedur lain yang didukung. Biaya penggunaan prosedur Spark yang tersimpan ditambahkan ke tagihan Anda di edisi Enterprise - biaya bayar sesuai penggunaan. Diskon organisasi Anda akan diterapkan, jika berlaku.
Meskipun prosedur yang disimpan oleh Spark menggunakan mesin eksekusi Spark, Anda tidak akan melihat biaya terpisah untuk eksekusi Spark. Seperti yang telah disebutkan, biaya yang sesuai dilaporkan sebagai SKU bayar sesuai penggunaan edisi BigQuery Enterprise.
Prosedur tersimpan di Spark tidak menawarkan paket gratis.
Membuat prosedur tersimpan untuk Spark
Anda harus membuat prosedur tersimpan di lokasi yang sama dengan koneksi yang Anda gunakan.
Jika isi prosedur tersimpan lebih dari 1 MB, sebaiknya Anda menempatkan prosedur tersimpan dalam file di bucket Cloud Storage dan bukan dengan kode inline. BigQuery memberikan dua metode untuk membuat prosedur tersimpan Spark menggunakan Python:
- Jika Anda ingin menggunakan pernyataan
CREATE PROCEDURE
, gunakan editor kueri SQL. - Jika Anda ingin mengetik kode Python secara langsung, gunakan editor PySpark. Anda dapat menyimpan kode sebagai prosedur tersimpan.
Menggunakan editor kueri SQL
Untuk membuat prosedur tersimpan untuk Spark di editor kueri SQL, ikuti langkah-langkah berikut:
Buka halaman BigQuery.
Di editor kueri, tambahkan kode contoh untuk pernyataan
CREATE PROCEDURE
yang muncul.Atau, di panel Explorer, klik koneksi dalam project yang Anda gunakan untuk membuat aset koneksi. Kemudian, untuk membuat prosedur tersimpan untuk Spark, klik
Buat prosedur tersimpan.Python
Untuk membuat prosedur tersimpan untuk Spark di Python, gunakan kode contoh berikut:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java atau Scala
Untuk membuat prosedur tersimpan bagi Spark di Java atau Scala dengan opsi
main_file_uri
, gunakan kode contoh berikut:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
Untuk membuat prosedur tersimpan bagi Spark di Java atau Scala dengan opsi
main_class
danjar_uris
, gunakan kode contoh berikut:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
Ganti kode berikut:
PROJECT_ID
: project tempat Anda ingin membuat prosedur tersimpan—misalnya,myproject
.DATASET
: set data tempat Anda ingin membuat prosedur tersimpan—misalnya,mydataset
.PROCEDURE_NAME
: nama prosedur tersimpan yang ingin Anda jalankan di BigQuery—misalnya,mysparkprocedure
.PROCEDURE_ARGUMENT
: parameter untuk memasukkan argumen input.Dalam parameter ini, tentukan kolom berikut:
ARGUMENT_MODE
: mode argumen.Nilai valid yang mencakup:
IN
,OUT
, danINOUT
. Secara default, nilainya adalahIN
.ARGUMENT_NAME
: nama argumen.ARGUMENT_TYPE
: jenis argumen.
Misalnya:
myproject.mydataset.mysparkproc(num INT64)
.Untuk mengetahui informasi selengkapnya, lihat meneruskan nilai sebagai parameter
IN
atau parameterOUT
danINOUT
dalam dokumen ini.CONNECTION_PROJECT_ID
: project yang berisi koneksi untuk menjalankan prosedur Spark.CONNECTION_REGION
: region yang berisi koneksi untuk menjalankan prosedur Spark—misalnya,us
.CONNECTION_ID
: ID koneksi—misalnya,myconnection
.Saat Anda melihat detail koneksi di Konsol Google Cloud, ID koneksi adalah nilai di bagian terakhir ID koneksi yang sepenuhnya memenuhi syarat, yang ditampilkan di Connection ID—misalnya
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: versi runtime Spark—misalnya,1.1
.MAIN_PYTHON_FILE_URI
: jalur ke file PySpark—misalnya,gs://mybucket/mypysparkmain.py
.Atau, jika Anda ingin menambahkan isi prosedur tersimpan dalam pernyataan
CREATE PROCEDURE
, tambahkanPYSPARK_CODE
setelahLANGUAGE PYTHON AS
seperti yang ditunjukkan dalam contoh di Menggunakan kode inline dalam dokumen ini.PYSPARK_CODE
: definisi aplikasi PySpark dalam pernyataanCREATE PROCEDURE
jika Anda ingin meneruskan isi prosedur secara inline.Nilainya adalah literal string. Jika kode menyertakan tanda kutip dan garis miring terbalik, keduanya harus di-escape atau direpresentasikan sebagai string mentah. Misalnya, kode yang menampilkan
"\n";
dapat direpresentasikan sebagai salah satu dari string berikut:- String kutipan:
"return \"\\n\";"
. Tanda petik dan garis miring terbalik akan di-escape. - String yang dikutip tiga kali:
"""return "\\n";"""
. Garis miring terbalik akan di-escape, sedangkan tanda kutip tidak. - String mentah:
r"""return "\n";"""
. Tidak diperlukan escaping.
- String kutipan:
MAIN_JAR_URI
: jalur file JAR yang berisi classmain
, misalnya,gs://mybucket/my_main.jar
.CLASS_NAME
: nama class yang sepenuhnya memenuhi syarat dalam JAR yang ditetapkan dengan opsijar_uris
, misalnya,com.example.wordcount
.URI
: jalur file JAR yang berisi class yang ditentukan dalam classmain
, misalnya,gs://mybucket/mypysparkmain.jar
.
Untuk opsi tambahan yang dapat Anda masukkan di
OPTIONS
, lihat daftar opsi prosedur.
Menggunakan editor PySpark
Saat membuat prosedur menggunakan editor PySpark, Anda tidak perlu menggunakan
pernyataan CREATE PROCEDURE
. Sebagai gantinya, tambahkan kode Python langsung di editor Pyspark, lalu simpan atau jalankan kode Anda.
Untuk membuat prosedur tersimpan untuk Spark di editor PySpark, ikuti langkah-langkah berikut:
Buka halaman BigQuery.
Jika Anda ingin langsung mengetik kode PySpark, buka editor PySpark. Untuk membuka editor PySpark, klik menu
di samping Create SQL query, lalu pilih Create PySpark Procedure.Untuk menetapkan opsi, klik More > PySpark Options, lalu lakukan hal berikut:
Tentukan lokasi tempat Anda ingin menjalankan kode PySpark.
Di kolom Connection, tentukan koneksi Spark.
Di bagian Stored procedure invocation, tentukan set data tempat Anda ingin menyimpan prosedur tersimpan sementara yang dihasilkan. Anda dapat menetapkan set data tertentu atau mengizinkan penggunaan set data sementara untuk memanggil kode PySpark.
Set data sementara dibuat dengan lokasi yang ditentukan pada langkah sebelumnya. Jika nama set data disebutkan, pastikan set data dan koneksi Spark berada di lokasi yang sama.
Di bagian Parameters, tentukan parameter untuk prosedur yang disimpan. Nilai parameter hanya digunakan selama menjalankan kode PySpark dalam sesi, tetapi deklarasinya disimpan dalam prosedur.
Di bagian Opsi lanjutan, tentukan opsi prosedur. Untuk daftar lengkap opsi prosedur, lihat daftar opsi prosedur.
Di bagian Properties, tambahkan pasangan nilai kunci untuk mengonfigurasi tugas. Anda dapat menggunakan salah satu key-value pair dari properti Dataproc Serverless Spark.
Di bagian Service account settings, tentukan akun layanan kustom, CMEK, set data staging, dan folder Cloud Storage staging yang akan digunakan selama menjalankan kode PySpark dalam sesi.
Klik Save.
Menyimpan prosedur tersimpan untuk Spark
Setelah membuat prosedur tersimpan menggunakan editor PySpark, Anda dapat menyimpan prosedur tersebut. Untuk melakukannya, ikuti langkah-langkah berikut:
Di Konsol Google Cloud, buka halaman BigQuery.
Di editor kueri, buat prosedur tersimpan untuk Spark menggunakan Python dengan editor PySpark.
Klik Simpan > Simpan prosedur.
Pada dialog Save stored procedure, sebutkan nama set data tempat Anda ingin menyimpan prosedur dan nama prosedur tersebut.
Klik Save.
Jika hanya ingin menjalankan kode PySpark, bukan menyimpannya sebagai prosedur tersimpan, Anda dapat mengklik Run, bukan Save.
Menggunakan container kustom
Penampung kustom menyediakan lingkungan runtime untuk proses driver dan eksekutor workload. Untuk menggunakan container kustom, gunakan kode contoh berikut:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Ganti kode berikut:
PROJECT_ID
: project tempat Anda ingin membuat prosedur tersimpan—misalnya,myproject
.DATASET
: set data tempat Anda ingin membuat prosedur tersimpan—misalnya,mydataset
.PROCEDURE_NAME
: nama prosedur tersimpan yang ingin Anda jalankan di BigQuery—misalnya,mysparkprocedure
.PROCEDURE_ARGUMENT
: parameter untuk memasukkan argumen input.Dalam parameter ini, tentukan kolom berikut:
ARGUMENT_MODE
: mode argumen.Nilai valid yang mencakup:
IN
,OUT
, danINOUT
. Secara default, nilainya adalahIN
.ARGUMENT_NAME
: nama argumen.ARGUMENT_TYPE
: jenis argumen.
Misalnya:
myproject.mydataset.mysparkproc(num INT64)
.Untuk mengetahui informasi selengkapnya, lihat meneruskan nilai sebagai parameter
IN
atau parameterOUT
danINOUT
dalam dokumen ini.CONNECTION_PROJECT_ID
: project yang berisi koneksi untuk menjalankan prosedur Spark.CONNECTION_REGION
: region yang berisi koneksi untuk menjalankan prosedur Spark—misalnya,us
.CONNECTION_ID
: ID koneksi, misalnya,myconnection
.Saat Anda melihat detail koneksi di Konsol Google Cloud, ID koneksi adalah nilai di bagian terakhir ID koneksi yang sepenuhnya memenuhi syarat, yang ditampilkan di Connection ID—misalnya
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: versi runtime Spark—misalnya,1.1
.MAIN_PYTHON_FILE_URI
: jalur ke file PySpark—misalnya,gs://mybucket/mypysparkmain.py
.Atau, jika Anda ingin menambahkan isi prosedur yang disimpan dalam pernyataan
CREATE PROCEDURE
, tambahkanPYSPARK_CODE
setelahLANGUAGE PYTHON AS
seperti yang ditunjukkan dalam contoh di Gunakan kode inline dalam dokumen ini.PYSPARK_CODE
: definisi aplikasi PySpark dalam pernyataanCREATE PROCEDURE
jika Anda ingin meneruskan isi prosedur secara inline.Nilainya adalah literal string. Jika kode menyertakan tanda kutip dan garis miring terbalik, keduanya harus di-escape atau dinyatakan sebagai string mentah. Misalnya, kode yang menampilkan
"\n";
dapat direpresentasikan sebagai salah satu dari berikut:- String kutipan:
"return \"\\n\";"
. Tanda kutip dan garis miring terbalik akan di-escape. - String yang dikutip tiga kali:
"""return "\\n";"""
. Garis miring terbalik akan di-escape, sedangkan tanda kutip tidak. - String mentah:
r"""return "\n";"""
. Tidak diperlukan escaping.
- String kutipan:
CONTAINER_IMAGE
: jalur image di artifacts registry. File ini hanya boleh berisi library yang akan digunakan dalam prosedur Anda. Jika tidak ditentukan, image container default sistem yang terkait dengan versi runtime akan digunakan.
Untuk mengetahui informasi selengkapnya tentang cara mem-build image container kustom dengan Spark, lihat Membangun image container kustom.
Memanggil prosedur tersimpan Spark
Setelah membuat prosedur tersimpan, Anda dapat memanggilnya dengan salah satu opsi berikut:
Konsol
Buka halaman BigQuery.
Di panel Explorer, luaskan project Anda dan pilih prosedur tersimpan untuk Spark yang ingin Anda jalankan.
Di jendela Stored procedure info, klik Invoke stored procedure. Atau, Anda dapat meluaskan opsi View actions, lalu mengklik Invoke.
Klik Run.
Di bagian All results, klik View results.
Opsional: Di bagian Query results, ikuti langkah-langkah berikut ini:
Jika Anda ingin melihat log driver Spark, klik Execution details.
Jika Anda ingin melihat log di Cloud Logging, klik Job information, lalu di kolom Log, klik log
Jika Anda ingin mendapatkan endpoint Spark History Server, klik Job information, lalu klik Spark history server.
SQL
Untuk memanggil prosedur tersimpan, gunakan pernyataan
CALL PROCEDURE
:
Di Konsol Google Cloud, buka halaman BigQuery.
Di editor kueri, masukkan pernyataan berikut:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Klik
Run.
Untuk informasi selengkapnya tentang cara menjalankan kueri, lihat Menjalankan kueri interaktif.
Menggunakan akun layanan khusus
Daripada menggunakan identitas layanan koneksi Spark untuk akses data, Anda dapat menggunakan akun layanan kustom untuk mengakses data dalam kode Spark Anda.
Untuk menggunakan akun layanan kustom, tentukan mode keamanan INVOKER
(menggunakan
pernyataan EXTERNAL SECURITY INVOKER
) saat Anda membuat
prosedur yang disimpan Spark, dan tentukan akun layanan
saat Anda memanggil prosedur yang disimpan.
Jika ingin mengakses dan menggunakan kode Spark dari Cloud Storage, Anda harus memberikan izin yang diperlukan untuk mengidentifikasi layanan koneksi Spark. Anda perlu memberikan izin IAM storage.objects.get
atau peran IAM storage.objectViewer
ke akun layanan koneksi.
Secara opsional, Anda dapat memberikan akses akun layanan koneksi ke Dataproc Metastore dan Dataproc Persistent History Server jika Anda telah menentukannya dalam koneksi. Untuk mengetahui informasi selengkapnya, lihat Memberikan akses ke akun layanan.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Secara opsional, Anda dapat menambahkan argumen berikut ke kode sebelumnya:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
Ganti kode berikut:
CUSTOM_SERVICE_ACCOUNT
: Wajib diisi. Akun layanan kustom yang Anda sediakan.BUCKET_NAME
: Opsional. Bucket Cloud Storage yang digunakan sebagai sistem file aplikasi Spark default. Jika tidak disediakan, bucket Cloud Storage default akan dibuat di project Anda dan bucket tersebut akan digunakan bersama oleh semua tugas yang berjalan dalam project yang sama.DATASET
: Opsional. Set data untuk menyimpan data sementara yang dihasilkan dengan memanggil prosedur. Data tersebut akan dibersihkan setelah pekerjaan selesai. Jika atribut ini tidak diberikan, set data sementara default akan dibuat untuk tugas tersebut.
Akun layanan kustom Anda harus memiliki izin berikut:
Untuk membaca dan menulis ke bucket staging yang digunakan sebagai sistem file aplikasi Spark default:
- Izin
storage.objects.*
atau peran IAMroles/storage.objectAdmin
pada bucket staging yang Anda tentukan. - Selain itu, izin
storage.buckets.*
atau peran IAMroles/storage.Admin
pada project jika bucket staging tidak ditentukan.
- Izin
(Opsional) Untuk membaca dan menulis data dari dan ke BigQuery:
bigquery.tables.*
pada tabel BigQuery.bigquery.readsessions.*
di project Anda.- Peran IAM
roles/bigquery.admin
mencakup izin sebelumnya.
(Opsional) Untuk membaca dan menulis data dari dan ke Cloud Storage:
- Izin
storage.objects.*
atau peran IAMroles/storage.objectAdmin
di objek Cloud Storage Anda.
- Izin
(Opsional) Untuk membaca dan menulis ke set data staging yang digunakan untuk parameter
INOUT/OUT
:- Peran IAM
bigquery.tables.*
atauroles/bigquery.dataEditor
pada set data staging yang Anda tentukan. - Selain itu, izin
bigquery.datasets.create
atau peran IAMroles/bigquery.dataEditor
pada project jika set data staging tidak ditentukan.
- Peran IAM
Contoh prosedur tersimpan untuk Spark
Bagian ini menunjukkan contoh pembuatan prosedur tersimpan untuk Apache Spark.
Menggunakan PySpark atau file JAR di Cloud Storage
Contoh berikut menunjukkan cara membuat prosedur tersimpan untuk Spark
menggunakan koneksi my-project-id.us.my-connection
dan PySpark atau file JAR yang disimpan di bucket Cloud Storage:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java atau Scala
Gunakan main_file_uri
untuk membuat prosedur tersimpan:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
Gunakan main_class
untuk membuat prosedur tersimpan:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
Menggunakan kode inline
Contoh berikut menunjukkan cara membuat prosedur tersimpan
untuk Spark menggunakan my-project-id.us.my-connection
koneksi dan
kode PySpark inline:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Meneruskan nilai sebagai parameter input
Contoh berikut menampilkan dua metode untuk meneruskan nilai sebagai parameter input di Python:
Metode 1: Menggunakan variabel lingkungan
Dalam kode PySpark, Anda dapat memperoleh parameter input prosedur tersimpan
untuk Spark melalui variabel lingkungan di driver dan
eksekutor Spark. Nama variabel lingkungan memiliki format
BIGQUERY_PROC_PARAM.PARAMETER_NAME
,
dengan PARAMETER_NAME
sebagai nama parameter input. Misalnya,
jika nama parameter input adalah var
, nama variabel lingkungan yang sesuai adalah BIGQUERY_PROC_PARAM.var
. Parameter
input dienkode JSON.
Di kode PySpark, Anda bisa mendapatkan nilai parameter input dalam string
JSON dari variabel lingkungan dan mendekodenya ke variabel Python.
Contoh berikut menunjukkan cara mendapatkan nilai parameter input jenis
INT64
ke dalam kode PySpark Anda:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
Metode 2: Menggunakan library bawaan
Dalam kode PySpark, Anda cukup mengimpor library bawaan dan menggunakannya untuk
mengisi semua jenis parameter. Untuk meneruskan parameter ke eksekutor, isi
parameter di driver Spark sebagai variabel Python dan teruskan nilainya ke
eksekutor. Library bawaan mendukung sebagian besar jenis data BigQuery,
kecuali INTERVAL
, GEOGRAPHY
, NUMERIC
, dan BIGNUMERIC
.
Jenis data BigQuery | Jenis data Python |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
Tidak Didukung |
BIGNUMERIC
|
Tidak Didukung |
INTERVAL
|
Tidak Didukung |
GEOGRAPHY
|
Tidak Didukung |
Contoh berikut menunjukkan cara mengimpor library bawaan dan menggunakannya untuk mengisi parameter input jenis INT64 dan parameter input jenis ARRAY<STRUCT<a INT64, b STRING>> ke kode PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
Dalam kode Java atau Scala, Anda dapat memperoleh parameter input dari prosedur
tersimpan untuk Spark melalui variabel lingkungan di driver dan eksekutor Spark. Nama variabel lingkungan memiliki format
BIGQUERY_PROC_PARAM.PARAMETER_NAME
, dengan PARAMETER_NAME
sebagai nama
parameter input. Misalnya, jika nama parameter input adalah var,
nama variabel lingkungan yang sesuai adalah BIGQUERY_PROC_PARAM.var
.
Di kode Java atau Scala, Anda bisa mendapatkan nilai parameter input dari
variabel lingkungan.
Contoh berikut menunjukkan cara mendapatkan nilai parameter input dari variabel lingkungan ke dalam kode Scala Anda:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
Contoh berikut menunjukkan cara mendapatkan parameter input dari variabel lingkungan ke dalam kode Java Anda:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
Meneruskan nilai sebagai parameter OUT
dan INOUT
Parameter output menampilkan nilai dari prosedur Spark, sedangkan parameter
INOUT
menerima nilai untuk prosedur dan menampilkan nilai dari prosedur.
Untuk menggunakan parameter OUT
dan INOUT
, tambahkan kata kunci OUT
atau INOUT
sebelum nama parameter saat membuat prosedur Spark. Dalam kode
PySpark, Anda menggunakan library bawaan untuk menampilkan nilai sebagai parameter OUT
atau
INOUT
. Sama seperti parameter input, library bawaan mendukung sebagian besar
jenis data BigQuery kecuali INTERVAL
, GEOGRAPHY
, NUMERIC
,
dan BIGNUMERIC
. Jenis nilai TIME
dan DATETIME
dikonversi ke zona waktu UTC
saat ditampilkan sebagai parameter OUT
atau INOUT
.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Membaca dari tabel Hive Metastore dan menulis hasilnya ke BigQuery
Contoh berikut menunjukkan cara mengubah tabel Hive Metastore dan menulis hasilnya ke BigQuery:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
Melihat filter log
Setelah memanggil prosedur tersimpan untuk Spark, Anda
dapat melihat informasi log. Untuk mendapatkan informasi filter Cloud Logging
dan endpoint Cluster Histori Spark, gunakan perintah bq
show
.
Informasi filter tersedia di kolom SparkStatistics
dari
tugas turunan. Untuk mendapatkan filter log, ikuti langkah-langkah berikut:
Buka halaman BigQuery.
Di editor kueri, cantumkan tugas turunan dari tugas skrip prosedur tersimpan:
bq ls -j --parent_job_id=$parent_job_id
Untuk mempelajari cara mendapatkan ID tugas, lihat Melihat detail tugas.
Outputnya mirip dengan hal berikut ini:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Identifikasi
jobId
untuk prosedur tersimpan Anda dan gunakan perintahbq show
untuk melihat detail tugas:bq show --format=prettyjson --job $child_job_id
Salin kolom
sparkStatistics
karena Anda akan memerlukannya untuk langkah lain.Outputnya mirip dengan hal berikut ini:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
Untuk Logging, buat filter log dengan kolom
SparkStatistics
:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
Log ditulis dalam resource yang dimonitor
bigquery.googleapis.com/SparkJob
. Log diberi label berdasarkan komponenINFO
,DRIVER
, danEXECUTOR
. Untuk memfilter log dari driver Spark, tambahkan komponenlabels.component = "DRIVER"
ke filter log. Untuk memfilter log dari eksekutor Spark, tambahkan komponenlabels.component = "EXECUTOR"
ke filter log.
Menggunakan kunci enkripsi yang dikelola pelanggan
Prosedur BigQuery Spark menggunakan kunci enkripsi yang dikelola pelanggan (Customer-Managed Encryption Key/CMEK) untuk melindungi konten Anda, bersama dengan enkripsi default yang disediakan oleh BigQuery. Untuk menggunakan CMEK dalam prosedur Spark, picu pembuatan akun layanan enkripsi BigQuery dan berikan izin yang diperlukan terlebih dahulu. Prosedur Spark juga mendukung kebijakan organisasi CMEK jika diterapkan ke project Anda.
Jika prosedur yang tersimpan menggunakan mode keamanan INVOKER
, CMEK Anda harus ditentukan melalui variabel sistem SQL saat memanggil prosedur. Jika tidak, CMEK Anda dapat ditentukan melalui koneksi yang terkait dengan prosedur yang disimpan.
Untuk menentukan CMEK melalui koneksi saat Anda membuat prosedur yang disimpan di Spark, gunakan kode contoh berikut:
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
Untuk menentukan CMEK melalui variabel sistem SQL saat memanggil prosedur, gunakan kode contoh berikut:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Menggunakan Kontrol Layanan VPC
Kontrol Layanan VPC dapat digunakan untuk menyiapkan perimeter yang aman untuk mencegah pemindahan data yang tidak sah. Untuk menggunakan Kontrol Layanan VPC dengan prosedur Spark guna mendapatkan keamanan tambahan, buat perimeter layanan terlebih dahulu.
Untuk melindungi tugas prosedur Spark sepenuhnya, tambahkan API berikut ke perimeter layanan:
- BigQuery API (
bigquery.googleapis.com
) - Cloud Logging API (
logging.googleapis.com
) - Cloud Storage API (
storage.googleapis.com
), jika Anda menggunakan Cloud Storage - Artifact Registry API (
artifactregistry.googleapis.com
) atau Container Registry API (containerregistry.googleapis.com
), jika Anda menggunakan penampung kustom - Dataproc Metastore API (
metastore.googleapis.com
) dan Cloud Run Admin API (run.googleapis.com
), jika Anda menggunakan Metastore Dataproc
Tambahkan project kueri prosedur spark ke perimeter. Tambahkan project lain yang menghosting kode atau data Spark Anda ke dalam perimeter.
Praktik terbaik
Saat Anda menggunakan koneksi di project untuk pertama kalinya, perlu waktu sekitar satu menit tambahan untuk penyediaan. Untuk menghemat waktu, Anda dapat menggunakan kembali koneksi Spark yang ada saat membuat prosedur tersimpan untuk Spark.
Saat Anda membuat prosedur Spark untuk penggunaan produksi, Google merekomendasikan untuk menentukan versi runtime. Untuk daftar versi runtime yang didukung, lihat Versi runtime Dataproc Serverless. Sebaiknya gunakan versi Dukungan Lama (LTS).
Saat menentukan container kustom dalam prosedur Spark, sebaiknya gunakan Artifact Registry dan streaming image.
Untuk mendapatkan performa yang lebih baik, Anda dapat menentukan properti alokasi resource dalam prosedur Spark. Prosedur yang disimpan Spark mendukung daftar properti alokasi resource yang sama seperti Dataproc Serverless.
Batasan
- Anda hanya bisa menggunakan protokol endpoint gRPC untuk terhubung ke Dataproc Metastore. Jenis Hive Metastore lainnya belum didukung.
- Kunci enkripsi yang dikelola pelanggan (CMEK) hanya tersedia saat pelanggan membuat prosedur Spark dengan satu region. Kunci CMEK wilayah global dan kunci CMEK multi-region, misalnya,
EU
atauUS
, tidak didukung. - Meneruskan parameter output hanya didukung untuk PySpark.
- Jika set data yang terkait dengan prosedur tersimpan untuk Spark direplikasi ke region tujuan melalui replikasi set data lintas region, prosedur tersimpan hanya dapat dikueri di region tempat kode dibuat.
Kuota dan batas
Untuk mendapatkan informasi tentang kuota dan batas, lihat prosedur tersimpan untuk kuota dan batas Spark.
Langkah selanjutnya
- Pelajari cara melihat prosedur tersimpan.
- Pelajari cara menghapus prosedur tersimpan.
- Pelajari cara bekerja dengan prosedur yang tersimpan di SQL.