Bekerja dengan prosedur tersimpan untuk Apache Spark
Dokumen ini ditujukan untuk data engineer, data scientist, dan analis data dalam membuat dan memanggil prosedur tersimpan untuk Spark di BigQuery.
Dengan BigQuery, Anda dapat membuat dan menjalankan prosedur tersimpan Spark yang ditulis dalam Python, Java, dan Scala. Anda kemudian dapat menjalankan prosedur tersimpan ini di BigQuery menggunakan kueri GoogleSQL, mirip seperti menjalankan prosedur tersimpan SQL.
Sebelum memulai
Untuk membuat prosedur tersimpan Spark, minta administrator Anda untuk membuat koneksi Spark dan membagikannya kepada Anda. Administrator juga harus memberikan izin Identity and Access Management (IAM) yang diperlukan untuk akun layanan yang terkait dengan koneksi.
Peran yang diperlukan
Untuk mendapatkan izin yang Anda perlukan untuk melakukan tugas dalam dokumen ini, minta administrator Anda untuk memberikan peran IAM berikut:
-
Buat prosedur tersimpan untuk Spark:
-
BigQuery Data Editor (
roles/bigquery.dataEditor
) di set data tempat Anda membuat prosedur tersimpan -
Admin Koneksi BigQuery (
roles/bigquery.connectionAdmin
) di koneksi yang digunakan prosedur tersimpan -
BigQuery Job User (
roles/bigquery.jobUser
) di project Anda
-
BigQuery Data Editor (
-
Memanggil prosedur tersimpan untuk Spark:
-
BigQuery Metadata Viewer (
roles/bigquery.metadataViewer
) pada set data tempat prosedur disimpan -
BigQuery Connection User (
roles/bigquery.connectionUser
) pada koneksi -
BigQuery Job User (
roles/bigquery.jobUser
) di project Anda
-
BigQuery Metadata Viewer (
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran yang telah ditetapkan ini berisi izin yang diperlukan untuk melakukan tugas dalam dokumen ini. Untuk melihat izin yang benar-benar diperlukan, luaskan bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk melakukan tugas dalam dokumen ini:
-
Buat koneksi:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Buat prosedur tersimpan untuk Spark:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Panggil prosedur tersimpan untuk Spark:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.
Pertimbangan lokasi
Anda harus membuat prosedur tersimpan untuk Spark di lokasi yang sama dengan koneksi Anda karena prosedur tersimpan berjalan di lokasi yang sama dengan koneksi Anda. Misalnya, untuk membuat prosedur tersimpan di multi-region AS, gunakan koneksi yang berada di multi-region AS.
Harga
Biaya untuk menjalankan prosedur Spark di BigQuery mirip dengan biaya untuk menjalankan prosedur Spark di Dataproc Serverless. Untuk mengetahui informasi selengkapnya, lihat Harga Dataproc Serverless.
Prosedur tersimpan Spark dapat digunakan dengan model harga on demand serta dengan edisi BigQuery. Prosedur Spark ditagih menggunakan model bayar sesuai penggunaan edisi BigQuery Enterprise dalam semua kasus, terlepas dari model harga komputasi yang digunakan dalam project Anda.
Prosedur tersimpan Spark untuk BigQuery tidak mendukung penggunaan pemesanan atau komitmen. Pemesanan dan komitmen yang ada akan terus digunakan untuk kueri dan prosedur lain yang didukung. Biaya penggunaan prosedur tersimpan Spark ditambahkan ke tagihan Anda di edisi Enterprise - biaya bayar sesuai penggunaan. Diskon organisasi Anda akan diterapkan, jika berlaku.
Meskipun prosedur tersimpan Spark menggunakan mesin eksekusi Spark, Anda tidak akan melihat tagihan terpisah untuk eksekusi Spark. Seperti yang telah disebutkan, biaya yang sesuai dilaporkan sebagai SKU bayar sesuai penggunaan edisi BigQuery Enterprise.
Prosedur tersimpan Spark tidak menawarkan paket gratis.
Membuat prosedur tersimpan untuk Spark
Anda harus membuat prosedur tersimpan di lokasi yang sama dengan koneksi yang Anda gunakan.
Jika isi prosedur tersimpan lebih dari 1 MB, sebaiknya Anda menempatkan prosedur tersimpan dalam file di bucket Cloud Storage dan bukan dengan kode inline. BigQuery memberikan dua metode untuk membuat prosedur tersimpan Spark menggunakan Python:
- Jika Anda ingin menggunakan pernyataan
CREATE PROCEDURE
, gunakan editor kueri SQL. - Jika Anda ingin mengetik kode Python secara langsung, gunakan editor PySpark. Anda dapat menyimpan kode sebagai prosedur tersimpan.
Menggunakan editor kueri SQL
Untuk membuat prosedur tersimpan Spark di editor kueri SQL, ikuti langkah-langkah berikut:
Buka halaman BigQuery.
Di editor kueri, tambahkan kode contoh untuk pernyataan
CREATE PROCEDURE
yang muncul.Atau, di panel Explorer, klik koneksi dalam project yang Anda gunakan untuk membuat aset koneksi. Kemudian, untuk membuat prosedur tersimpan untuk Spark, klik
Buat prosedur tersimpan.Python
Untuk membuat prosedur tersimpan untuk Spark di Python, gunakan kode contoh berikut:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java atau Scala
Untuk membuat prosedur tersimpan Spark di Java atau Scala dengan opsi
main_file_uri
, gunakan kode contoh berikut:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
Untuk membuat prosedur tersimpan Spark di Java atau Scala dengan opsi
main_class
danjar_uris
, gunakan kode contoh berikut:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
Ganti kode berikut:
PROJECT_ID
: project tempat Anda ingin membuat prosedur tersimpan—misalnya,myproject
.DATASET
: set data tempat Anda ingin membuat prosedur tersimpan—misalnya,mydataset
.PROCEDURE_NAME
: nama prosedur tersimpan yang ingin Anda jalankan di BigQuery—misalnya,mysparkprocedure
.PROCEDURE_ARGUMENT
: parameter untuk memasukkan argumen input.Dalam parameter ini, tentukan kolom berikut:
ARGUMENT_MODE
: mode argumen.Nilai valid yang mencakup:
IN
,OUT
, danINOUT
. Secara default, nilainya adalahIN
.ARGUMENT_NAME
: nama argumen.ARGUMENT_TYPE
: jenis argumen.
Misalnya:
myproject.mydataset.mysparkproc(num INT64)
.Untuk mengetahui informasi selengkapnya, lihat meneruskan nilai sebagai parameter
IN
atau parameterOUT
danINOUT
dalam dokumen ini.CONNECTION_PROJECT_ID
: project yang berisi koneksi untuk menjalankan prosedur Spark.CONNECTION_REGION
: region yang berisi koneksi untuk menjalankan prosedur Spark—misalnya,us
.CONNECTION_ID
: ID koneksi—misalnya,myconnection
.Saat Anda melihat detail koneksi di konsol Google Cloud, ID koneksi adalah nilai di bagian terakhir dari ID koneksi yang sepenuhnya memenuhi syarat yang ditampilkan di ID Koneksi—misalnya,
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: versi runtime Spark—misalnya,1.1
.MAIN_PYTHON_FILE_URI
: jalur ke file PySpark—misalnya,gs://mybucket/mypysparkmain.py
.Atau, jika Anda ingin menambahkan isi prosedur tersimpan dalam pernyataan
CREATE PROCEDURE
, tambahkanPYSPARK_CODE
setelahLANGUAGE PYTHON AS
seperti yang ditunjukkan dalam contoh di Menggunakan kode inline dalam dokumen ini.PYSPARK_CODE
: definisi aplikasi PySpark dalam pernyataanCREATE PROCEDURE
jika Anda ingin meneruskan isi prosedur secara inline.Nilainya adalah literal string. Jika kode menyertakan tanda kutip dan garis miring terbalik, keduanya harus di-escape atau direpresentasikan sebagai string mentah. Misalnya, kode yang menampilkan
"\n";
dapat direpresentasikan sebagai salah satu dari string berikut:- String kutipan:
"return \"\\n\";"
. Tanda petik dan garis miring terbalik akan di-escape. - String yang dikutip tiga kali:
"""return "\\n";"""
. Garis miring terbalik akan di-escape, sedangkan tanda kutip tidak. - String mentah:
r"""return "\n";"""
. Tidak diperlukan escaping.
- String kutipan:
MAIN_JAR_URI
: jalur file JAR yang berisi classmain
, misalnya,gs://mybucket/my_main.jar
.CLASS_NAME
: nama class yang sepenuhnya memenuhi syarat dalam JAR yang ditetapkan dengan opsijar_uris
, misalnya,com.example.wordcount
.URI
: jalur file JAR yang berisi class yang ditentukan dalam classmain
, misalnya,gs://mybucket/mypysparkmain.jar
.
Untuk opsi tambahan yang dapat Anda masukkan di
OPTIONS
, lihat daftar opsi prosedur.
Menggunakan editor PySpark
Saat membuat prosedur menggunakan editor PySpark, Anda tidak perlu menggunakan pernyataan CREATE PROCEDURE
. Sebagai gantinya, tambahkan kode Python Anda langsung di editor Pyspark dan simpan atau jalankan kode Anda.
Untuk membuat prosedur tersimpan untuk Spark di editor PySpark, ikuti langkah-langkah berikut:
Buka halaman BigQuery.
Jika Anda ingin mengetik kode PySpark secara langsung, buka editor PySpark. Untuk membuka editor PySpark, klik menu
di samping Create SQL query, lalu pilih Create PySpark Procedure.Untuk menetapkan opsi, klik More > PySpark Options, lalu lakukan hal berikut:
Tentukan lokasi tempat Anda ingin menjalankan kode PySpark.
Di kolom Connection, tentukan koneksi Spark.
Di bagian Stored procedure invocation, tentukan set data tempat Anda ingin menyimpan prosedur tersimpan sementara yang dihasilkan. Anda dapat menetapkan set data tertentu atau mengizinkan penggunaan set data sementara untuk memanggil kode PySpark.
Set data sementara dibuat dengan lokasi yang ditentukan pada langkah sebelumnya. Jika nama set data disebutkan, pastikan set data dan koneksi Spark berada di lokasi yang sama.
Di bagian Parameter, tentukan parameter untuk prosedur tersimpan. Nilai parameter hanya digunakan selama menjalankan kode PySpark dalam sesi, tetapi deklarasinya disimpan dalam prosedur.
Di bagian Opsi lanjutan, tentukan opsi prosedur. Untuk daftar lengkap opsi prosedur, lihat daftar opsi prosedur.
Di bagian Properties, tambahkan pasangan nilai kunci untuk mengonfigurasi tugas. Anda dapat menggunakan salah satu pasangan nilai kunci dari properti Dataproc Serverless Spark.
Di Setelan akun layanan, tentukan akun layanan kustom,CMEK, set data staging, dan folder Cloud Storage staging yang akan digunakan selama pengoperasian kode PySpark dalam sesi.
Klik Simpan.
Menyimpan prosedur tersimpan untuk Spark
Setelah membuat prosedur tersimpan menggunakan editor PySpark, Anda dapat menyimpan prosedur tersebut. Untuk melakukannya, ikuti langkah-langkah berikut:
Di Konsol Google Cloud, buka halaman BigQuery.
Di editor kueri, buat prosedur tersimpan untuk Spark menggunakan Python dengan editor PySpark.
Klik Simpan > Simpan prosedur.
Pada dialog Save stored procedure, sebutkan nama set data tempat Anda ingin menyimpan prosedur dan nama prosedur tersebut.
Klik Simpan.
Jika hanya ingin menjalankan kode PySpark, bukan menyimpannya sebagai prosedur tersimpan, Anda dapat mengklik Run, bukan Save.
Menggunakan container kustom
Container kustom menyediakan lingkungan runtime untuk proses driver dan eksekutor workload. Untuk menggunakan penampung kustom, gunakan kode contoh berikut:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Ganti kode berikut:
PROJECT_ID
: project tempat Anda ingin membuat prosedur tersimpan—misalnya,myproject
.DATASET
: set data tempat Anda ingin membuat prosedur tersimpan—misalnya,mydataset
.PROCEDURE_NAME
: nama prosedur tersimpan yang ingin Anda jalankan di BigQuery—misalnya,mysparkprocedure
.PROCEDURE_ARGUMENT
: parameter untuk memasukkan argumen input.Dalam parameter ini, tentukan kolom berikut:
ARGUMENT_MODE
: mode argumen.Nilai valid yang mencakup:
IN
,OUT
, danINOUT
. Secara default, nilainya adalahIN
.ARGUMENT_NAME
: nama argumen.ARGUMENT_TYPE
: jenis argumen.
Misalnya:
myproject.mydataset.mysparkproc(num INT64)
.Untuk mengetahui informasi selengkapnya, lihat meneruskan nilai sebagai parameter
IN
atau parameterOUT
danINOUT
dalam dokumen ini.CONNECTION_PROJECT_ID
: project yang berisi koneksi untuk menjalankan prosedur Spark.CONNECTION_REGION
: region yang berisi koneksi untuk menjalankan prosedur Spark—misalnya,us
.CONNECTION_ID
: ID koneksi, misalnya,myconnection
.Saat Anda melihat detail koneksi di konsol Google Cloud, ID koneksi adalah nilai di bagian terakhir dari ID koneksi yang sepenuhnya memenuhi syarat yang ditampilkan di ID Koneksi—misalnya,
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: versi runtime Spark—misalnya,1.1
.MAIN_PYTHON_FILE_URI
: jalur ke file PySpark—misalnya,gs://mybucket/mypysparkmain.py
.Atau, jika Anda ingin menambahkan isi prosedur tersimpan dalam pernyataan
CREATE PROCEDURE
, tambahkanPYSPARK_CODE
setelahLANGUAGE PYTHON AS
seperti yang ditunjukkan dalam contoh di Menggunakan kode inline dalam dokumen ini.PYSPARK_CODE
: definisi aplikasi PySpark dalam pernyataanCREATE PROCEDURE
jika Anda ingin meneruskan isi prosedur secara inline.Nilainya adalah literal string. Jika kode menyertakan tanda kutip dan garis miring terbalik, keduanya harus di-escape atau direpresentasikan sebagai string mentah. Misalnya, kode yang menampilkan
"\n";
dapat direpresentasikan sebagai salah satu dari string berikut:- String kutipan:
"return \"\\n\";"
. Tanda petik dan garis miring terbalik akan di-escape. - String yang dikutip tiga kali:
"""return "\\n";"""
. Garis miring terbalik akan di-escape, sedangkan tanda kutip tidak. - String mentah:
r"""return "\n";"""
. Tidak diperlukan escaping.
- String kutipan:
CONTAINER_IMAGE
: jalur gambar di registry artefak. File ini hanya boleh berisi library yang akan digunakan dalam prosedur Anda. Jika tidak ditentukan, image penampung default sistem yang terkait dengan versi runtime akan digunakan.
Untuk mengetahui informasi selengkapnya tentang cara mem-build image container kustom dengan Spark, lihat Mem-build image container kustom.
Memanggil prosedur tersimpan Spark
Setelah membuat prosedur tersimpan, Anda dapat memanggilnya dengan salah satu opsi berikut:
Konsol
Buka halaman BigQuery.
Di panel Explorer, luaskan project Anda dan pilih prosedur tersimpan untuk Spark yang ingin Anda jalankan.
Di jendela Stored procedure info, klik Invoke stored procedure. Atau, Anda dapat meluaskan opsi View actions, lalu mengklik Invoke.
Klik Run.
Di bagian All results, klik View results.
Opsional: Di bagian Query results, ikuti langkah-langkah berikut ini:
Jika Anda ingin melihat log driver Spark, klik Execution details.
Jika Anda ingin melihat log di Cloud Logging, klik Job information, lalu di kolom Log, klik log
Jika Anda ingin mendapatkan endpoint Spark History Server, klik Job information, lalu klik Spark history server.
SQL
Untuk memanggil prosedur tersimpan, gunakan pernyataan
CALL PROCEDURE
:
Di Konsol Google Cloud, buka halaman BigQuery.
Di editor kueri, masukkan pernyataan berikut:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Klik
Run.
Untuk informasi selengkapnya tentang cara menjalankan kueri, lihat Menjalankan kueri interaktif.
Menggunakan akun layanan khusus
Alih-alih menggunakan identitas layanan koneksi Spark untuk akses data, Anda dapat menggunakan akun layanan kustom untuk mengakses data dalam kode Spark.
Untuk menggunakan akun layanan kustom, tentukan mode keamanan INVOKER
(menggunakan pernyataan EXTERNAL SECURITY INVOKER
) saat Anda membuat prosedur tersimpan Spark, dan tentukan akun layanan saat Anda memanggil prosedur tersimpan.
Jika ingin mengakses dan menggunakan kode Spark dari
Cloud Storage, Anda harus memberikan izin yang diperlukan ke
identitas layanan koneksi Spark. Anda harus memberikan izin IAM storage.objects.get
atau peran IAM storage.objectViewer
ke akun layanan koneksi.
Secara opsional, Anda dapat memberikan akses akun layanan koneksi ke Dataproc Metastore dan Dataproc Persistent History Server jika telah menentukannya dalam koneksi. Untuk mengetahui informasi selengkapnya, lihat Memberikan akses ke akun layanan.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Secara opsional, Anda dapat menambahkan argumen berikut ke kode sebelumnya:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
Ganti kode berikut:
CUSTOM_SERVICE_ACCOUNT
: Wajib diisi. Akun layanan kustom yang disediakan oleh Anda.BUCKET_NAME
: Opsional. Bucket Cloud Storage yang digunakan sebagai sistem file aplikasi Spark default. Jika tidak disediakan, bucket Cloud Storage default akan dibuat di project Anda dan bucket tersebut akan dibagikan oleh semua tugas yang berjalan dalam project yang sama.DATASET
: Opsional. Set data untuk menyimpan data sementara yang dihasilkan dengan memanggil prosedur. Data akan dihapus setelah tugas selesai. Jika tidak disediakan, set data sementara default akan dibuat untuk tugas.
Akun layanan kustom Anda harus memiliki izin berikut:
Untuk membaca dan menulis ke bucket staging yang digunakan sebagai sistem file aplikasi Spark default:
- Izin
storage.objects.*
atau peran IAMroles/storage.objectAdmin
di bucket staging yang Anda tentukan. - Selain itu, izin
storage.buckets.*
atau peran IAMroles/storage.Admin
di project jika bucket staging tidak ditentukan.
- Izin
(Opsional) Untuk membaca dan menulis data dari dan ke BigQuery:
bigquery.tables.*
di tabel BigQuery Anda.bigquery.readsessions.*
di project Anda.- Peran IAM
roles/bigquery.admin
mencakup izin sebelumnya.
(Opsional) Untuk membaca dan menulis data dari dan ke Cloud Storage:
- Izin
storage.objects.*
atau peran IAMroles/storage.objectAdmin
pada objek Cloud Storage Anda.
- Izin
(Opsional) Untuk membaca dan menulis ke set data staging yang digunakan untuk parameter
INOUT/OUT
:- Peran IAM
bigquery.tables.*
atauroles/bigquery.dataEditor
di set data staging yang Anda tentukan. - Selain itu, izin
bigquery.datasets.create
atau peran IAMroles/bigquery.dataEditor
di project jika set data staging tidak ditentukan.
- Peran IAM
Contoh prosedur tersimpan untuk Spark
Bagian ini menunjukkan contoh pembuatan prosedur tersimpan untuk Apache Spark.
Menggunakan PySpark atau file JAR di Cloud Storage
Contoh berikut menunjukkan cara membuat prosedur tersimpan untuk Spark
menggunakan koneksi my-project-id.us.my-connection
dan PySpark atau file JAR yang disimpan di bucket Cloud Storage:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java atau Scala
Gunakan main_file_uri
untuk membuat prosedur tersimpan:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
Gunakan main_class
untuk membuat prosedur tersimpan:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
Menggunakan kode inline
Contoh berikut menunjukkan cara membuat prosedur tersimpan
untuk Spark menggunakan my-project-id.us.my-connection
koneksi dan
kode PySpark inline:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Meneruskan nilai sebagai parameter input
Contoh berikut menampilkan dua metode untuk meneruskan nilai sebagai parameter input di Python:
Metode 1: Menggunakan variabel lingkungan
Dalam kode PySpark, Anda dapat memperoleh parameter input prosedur tersimpan
untuk Spark melalui variabel lingkungan di driver dan
eksekutor Spark. Nama variabel lingkungan memiliki format
BIGQUERY_PROC_PARAM.PARAMETER_NAME
,
dengan PARAMETER_NAME
sebagai nama parameter input. Misalnya,
jika nama parameter input adalah var
, nama variabel lingkungan yang sesuai adalah BIGQUERY_PROC_PARAM.var
. Parameter
input dienkode JSON.
Di kode PySpark, Anda bisa mendapatkan nilai parameter input dalam string
JSON dari variabel lingkungan dan mendekodenya ke variabel Python.
Contoh berikut menunjukkan cara mendapatkan nilai parameter input jenis
INT64
ke dalam kode PySpark Anda:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
Metode 2: Menggunakan library bawaan
Dalam kode PySpark, Anda cukup mengimpor library bawaan dan menggunakannya untuk
mengisi semua jenis parameter. Untuk meneruskan parameter ke eksekutor, isi
parameter di driver Spark sebagai variabel Python dan teruskan nilainya ke
eksekutor. Library bawaan mendukung sebagian besar jenis data BigQuery,
kecuali INTERVAL
, GEOGRAPHY
, NUMERIC
, dan BIGNUMERIC
.
Jenis data BigQuery | Jenis data Python |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
Tidak Didukung |
BIGNUMERIC
|
Tidak Didukung |
INTERVAL
|
Tidak Didukung |
GEOGRAPHY
|
Tidak Didukung |
Contoh berikut menunjukkan cara mengimpor library bawaan dan menggunakannya untuk mengisi parameter input jenis INT64 dan parameter input jenis ARRAY<STRUCT<a INT64, b STRING>> ke kode PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
Dalam kode Java atau Scala, Anda dapat memperoleh parameter input dari prosedur
tersimpan untuk Spark melalui variabel lingkungan di driver dan eksekutor Spark. Nama variabel lingkungan memiliki format
BIGQUERY_PROC_PARAM.PARAMETER_NAME
, dengan PARAMETER_NAME
sebagai nama
parameter input. Misalnya, jika nama parameter input adalah var,
nama variabel lingkungan yang sesuai adalah BIGQUERY_PROC_PARAM.var
.
Di kode Java atau Scala, Anda bisa mendapatkan nilai parameter input dari
variabel lingkungan.
Contoh berikut menunjukkan cara mendapatkan nilai parameter input dari variabel lingkungan ke dalam kode Scala Anda:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
Contoh berikut menunjukkan cara mendapatkan parameter input dari variabel lingkungan ke dalam kode Java Anda:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
Meneruskan nilai sebagai parameter OUT
dan INOUT
Parameter output menampilkan nilai dari prosedur Spark, sedangkan parameter
INOUT
menerima nilai untuk prosedur dan menampilkan nilai dari prosedur.
Untuk menggunakan parameter OUT
dan INOUT
, tambahkan kata kunci OUT
atau INOUT
sebelum nama parameter saat membuat prosedur Spark. Dalam kode
PySpark, Anda menggunakan library bawaan untuk menampilkan nilai sebagai parameter OUT
atau
INOUT
. Sama seperti parameter input, library bawaan mendukung sebagian besar
jenis data BigQuery kecuali INTERVAL
, GEOGRAPHY
, NUMERIC
,
dan BIGNUMERIC
. Jenis nilai TIME
dan DATETIME
dikonversi ke zona waktu UTC
saat ditampilkan sebagai parameter OUT
atau INOUT
.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Membaca dari tabel Hive Metastore dan menulis hasilnya ke BigQuery
Contoh berikut menunjukkan cara mengubah tabel Hive Metastore dan menulis hasilnya ke BigQuery:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
Melihat filter log
Setelah memanggil prosedur tersimpan untuk Spark, Anda
dapat melihat informasi log. Untuk mendapatkan informasi filter Cloud Logging
dan endpoint Cluster Histori Spark, gunakan perintah bq
show
.
Informasi filter tersedia di kolom SparkStatistics
dari
tugas turunan. Untuk mendapatkan filter log, ikuti langkah-langkah berikut:
Buka halaman BigQuery.
Di editor kueri, cantumkan tugas turunan dari tugas skrip prosedur tersimpan:
bq ls -j --parent_job_id=$parent_job_id
Untuk mempelajari cara mendapatkan ID tugas, lihat Melihat detail tugas.
Outputnya mirip seperti ini:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Identifikasi
jobId
untuk prosedur tersimpan Anda dan gunakan perintahbq show
untuk melihat detail tugas:bq show --format=prettyjson --job $child_job_id
Salin kolom
sparkStatistics
karena Anda akan memerlukannya untuk langkah lain.Outputnya mirip seperti ini:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
Untuk Logging, buat filter log dengan kolom
SparkStatistics
:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
Log ditulis dalam resource yang dimonitor
bigquery.googleapis.com/SparkJob
. Log diberi label berdasarkan komponenINFO
,DRIVER
, danEXECUTOR
. Untuk memfilter log dari driver Spark, tambahkan komponenlabels.component = "DRIVER"
ke filter log. Untuk memfilter log dari eksekutor Spark, tambahkan komponenlabels.component = "EXECUTOR"
ke filter log.
Menggunakan kunci enkripsi yang dikelola pelanggan
Prosedur BigQuery Spark menggunakan kunci enkripsi yang dikelola pelanggan (CMEK) untuk melindungi konten Anda, bersama dengan enkripsi default yang disediakan oleh BigQuery. Untuk menggunakan CMEK dalam prosedur Spark, picu pembuatan akun layanan enkripsi BigQuery dan berikan izin yang diperlukan terlebih dahulu. Prosedur Spark juga mendukung kebijakan organisasi CMEK jika diterapkan ke project Anda.
Jika prosedur tersimpan Anda menggunakan mode keamanan INVOKER
, CMEK Anda harus ditentukan melalui variabel sistem SQL saat memanggil prosedur. Jika tidak, CMEK Anda dapat ditentukan melalui koneksi yang terkait dengan prosedur tersimpan.
Untuk menentukan CMEK melalui koneksi saat Anda membuat prosedur tersimpan Spark, gunakan kode contoh berikut:
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
Untuk menentukan CMEK melalui variabel sistem SQL saat memanggil prosedur, gunakan kode contoh berikut:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Menggunakan Kontrol Layanan VPC
Kontrol Layanan VPC memungkinkan Anda menyiapkan perimeter yang aman untuk mencegah pemindahan data yang tidak sah. Agar dapat menggunakan Kontrol Layanan VPC dengan prosedur Spark untuk keamanan tambahan, buat perimeter layanan terlebih dahulu.
Untuk sepenuhnya melindungi tugas prosedur Spark Anda, tambahkan API berikut ke perimeter layanan:
- BigQuery API (
bigquery.googleapis.com
) - Cloud Logging API (
logging.googleapis.com
) - Cloud Storage API (
storage.googleapis.com
), jika Anda menggunakan Cloud Storage - Artifact Registry API (
artifactregistry.googleapis.com
) atau Container Registry API (containerregistry.googleapis.com
), jika Anda menggunakan penampung kustom - Dataproc Metastore API (
metastore.googleapis.com
) dan Cloud Run Admin API (run.googleapis.com
), jika Anda menggunakan Dataproc Metastore
Tambahkan project kueri prosedur spark ke dalam perimeter. Tambahkan project lain yang menghosting kode atau data Spark Anda ke perimeter.
Praktik terbaik
Saat Anda menggunakan koneksi dalam project untuk pertama kalinya, penyediaan memerlukan waktu satu menit lebih lama. Untuk menghemat waktu, Anda dapat menggunakan kembali koneksi Spark yang ada saat membuat prosedur tersimpan untuk Spark.
Saat Anda membuat prosedur Spark untuk penggunaan produksi, Google merekomendasikan untuk menentukan versi runtime. Untuk daftar versi runtime yang didukung, lihat Versi runtime Dataproc Serverless. Sebaiknya gunakan versi Dukungan Jangka Panjang (LTS).
Saat Anda menentukan penampung kustom dalam prosedur Spark, sebaiknya gunakan Artifact Registry dan streaming image.
Untuk performa yang lebih baik, Anda dapat menentukan properti alokasi resource dalam prosedur Spark. Prosedur tersimpan Spark mendukung daftar properti alokasi resource yang sama dengan Dataproc Serverless.
Batasan
- Anda hanya bisa menggunakan protokol endpoint gRPC untuk terhubung ke Dataproc Metastore. Jenis Hive Metastore lainnya belum didukung.
- Kunci enkripsi yang dikelola pelanggan (CMEK) hanya tersedia saat pelanggan membuat prosedur Spark satu region. Kunci CMEK region global dan kunci CMEK multi-region, misalnya,
EU
atauUS
, tidak didukung. - Meneruskan parameter output hanya didukung untuk PySpark.
- Jika set data yang terkait dengan prosedur tersimpan untuk Spark direplikasi ke region tujuan melalui replikasi set data lintas region, prosedur tersimpan hanya dapat dikueri di region tempat kode dibuat.
- Spark tidak mendukung akses ke endpoint HTTP di jaringan Kontrol Layanan VPC pribadi Anda.
Kuota dan batas
Untuk mendapatkan informasi tentang kuota dan batas, lihat prosedur tersimpan untuk kuota dan batas Spark.
Langkah berikutnya
- Pelajari cara melihat prosedur tersimpan.
- Pelajari cara menghapus prosedur tersimpan.
- Pelajari cara bekerja dengan prosedur yang tersimpan di SQL.