Bekerja dengan prosedur tersimpan untuk Apache Spark

Dokumen ini ditujukan bagi data engineer, data scientist, dan analis data untuk membuat dan memanggil prosedur tersimpan untuk Spark di BigQuery.

Dengan BigQuery, Anda dapat membuat dan menjalankan prosedur tersimpan Spark yang ditulis dalam Python, Java, dan Scala. Anda kemudian dapat menjalankan prosedur tersimpan ini di BigQuery menggunakan kueri GoogleSQL, mirip seperti menjalankan prosedur tersimpan SQL.

Sebelum memulai

Untuk membuat prosedur tersimpan Spark, minta administrator Anda untuk membuat koneksi Spark dan membagikannya kepada Anda. Administrator juga harus memberikan izin Identity and Access Management (IAM) yang diperlukan untuk akun layanan yang terkait dengan koneksi.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk melakukan tugas dalam dokumen ini, minta administrator Anda untuk memberikan peran IAM berikut:

Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses.

Peran yang telah ditetapkan ini berisi izin yang diperlukan untuk melakukan tugas dalam dokumen ini. Untuk melihat izin yang benar-benar diperlukan, luaskan bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk melakukan tugas dalam dokumen ini:

  • Buat koneksi:
    • bigquery.connections.create
    • bigquery.connections.list
  • Buat prosedur tersimpan untuk Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Panggil prosedur tersimpan untuk Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

Anda juga dapat mendapatkan izin-izin ini dengan peran khusus atau peran yang telah ditetapkan sebelumnya.

Pertimbangan lokasi

Anda harus membuat prosedur tersimpan untuk Spark di lokasi yang sama dengan koneksi Anda karena prosedur tersimpan berjalan di lokasi yang sama dengan koneksi Anda. Misalnya, untuk membuat prosedur tersimpan di multi-region AS, gunakan koneksi yang berada di multi-region AS.

Harga

  • Biaya untuk menjalankan prosedur Spark di BigQuery serupa dengan biaya untuk menjalankan prosedur Spark pada Dataproc Serverless. Untuk mengetahui informasi selengkapnya, lihat harga Dataproc Serverless.

  • Prosedur yang disimpan di Spark dapat digunakan dengan model harga on demand serta dengan edisi BigQuery. Prosedur Spark dikenai biaya menggunakan model bayar sesuai penggunaan edisi BigQuery Enterprise dalam semua kasus, terlepas dari model harga komputasi yang digunakan dalam project Anda.

  • Prosedur yang disimpan oleh Spark untuk BigQuery tidak mendukung penggunaan reservasi atau komitmen. Reservasi dan komitmen yang ada akan terus digunakan untuk kueri dan prosedur lain yang didukung. Biaya penggunaan prosedur Spark yang tersimpan ditambahkan ke tagihan Anda di edisi Enterprise - biaya bayar sesuai penggunaan. Diskon organisasi Anda akan diterapkan, jika berlaku.

  • Meskipun prosedur yang disimpan oleh Spark menggunakan mesin eksekusi Spark, Anda tidak akan melihat biaya terpisah untuk eksekusi Spark. Seperti yang telah disebutkan, biaya yang sesuai dilaporkan sebagai SKU bayar sesuai penggunaan edisi BigQuery Enterprise.

  • Prosedur tersimpan di Spark tidak menawarkan paket gratis.

Membuat prosedur tersimpan untuk Spark

Anda harus membuat prosedur tersimpan di lokasi yang sama dengan koneksi yang Anda gunakan.

Jika isi prosedur tersimpan lebih dari 1 MB, sebaiknya Anda menempatkan prosedur tersimpan dalam file di bucket Cloud Storage dan bukan dengan kode inline. BigQuery memberikan dua metode untuk membuat prosedur tersimpan Spark menggunakan Python:

Menggunakan editor kueri SQL

Untuk membuat prosedur tersimpan untuk Spark di editor kueri SQL, ikuti langkah-langkah berikut:

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, tambahkan kode contoh untuk pernyataan CREATE PROCEDURE yang muncul.

    Atau, di panel Explorer, klik koneksi dalam project yang Anda gunakan untuk membuat aset koneksi. Kemudian, untuk membuat prosedur tersimpan untuk Spark, klik Buat prosedur tersimpan.

    Python

    Untuk membuat prosedur tersimpan untuk Spark di Python, gunakan kode contoh berikut:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java atau Scala

    Untuk membuat prosedur tersimpan bagi Spark di Java atau Scala dengan opsi main_file_uri, gunakan kode contoh berikut:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Untuk membuat prosedur tersimpan bagi Spark di Java atau Scala dengan opsi main_class dan jar_uris, gunakan kode contoh berikut:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Ganti kode berikut:

    • PROJECT_ID: project tempat Anda ingin membuat prosedur tersimpan—misalnya, myproject.
    • DATASET: set data tempat Anda ingin membuat prosedur tersimpan—misalnya, mydataset.
    • PROCEDURE_NAME: nama prosedur tersimpan yang ingin Anda jalankan di BigQuery—misalnya, mysparkprocedure.
    • PROCEDURE_ARGUMENT: parameter untuk memasukkan argumen input.

      Dalam parameter ini, tentukan kolom berikut:

      • ARGUMENT_MODE: mode argumen.

        Nilai valid yang mencakup: IN, OUT, dan INOUT. Secara default, nilainya adalah IN.

      • ARGUMENT_NAME: nama argumen.
      • ARGUMENT_TYPE: jenis argumen.

      Misalnya: myproject.mydataset.mysparkproc(num INT64).

      Untuk mengetahui informasi selengkapnya, lihat meneruskan nilai sebagai parameter IN atau parameter OUT dan INOUT dalam dokumen ini.

    • CONNECTION_PROJECT_ID: project yang berisi koneksi untuk menjalankan prosedur Spark.
    • CONNECTION_REGION: region yang berisi koneksi untuk menjalankan prosedur Spark—misalnya, us.
    • CONNECTION_ID: ID koneksi—misalnya, myconnection.

      Saat Anda melihat detail koneksi di Konsol Google Cloud, ID koneksi adalah nilai di bagian terakhir ID koneksi yang sepenuhnya memenuhi syarat, yang ditampilkan di Connection ID—misalnya projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: versi runtime Spark—misalnya, 1.1.
    • MAIN_PYTHON_FILE_URI: jalur ke file PySpark—misalnya, gs://mybucket/mypysparkmain.py.

      Atau, jika Anda ingin menambahkan isi prosedur tersimpan dalam pernyataan CREATE PROCEDURE, tambahkan PYSPARK_CODE setelah LANGUAGE PYTHON AS seperti yang ditunjukkan dalam contoh di Menggunakan kode inline dalam dokumen ini.

    • PYSPARK_CODE: definisi aplikasi PySpark dalam pernyataan CREATE PROCEDURE jika Anda ingin meneruskan isi prosedur secara inline.

      Nilainya adalah literal string. Jika kode menyertakan tanda kutip dan garis miring terbalik, keduanya harus di-escape atau direpresentasikan sebagai string mentah. Misalnya, kode yang menampilkan "\n"; dapat direpresentasikan sebagai salah satu dari string berikut:

      • String kutipan: "return \"\\n\";". Tanda petik dan garis miring terbalik akan di-escape.
      • String yang dikutip tiga kali: """return "\\n";""". Garis miring terbalik akan di-escape, sedangkan tanda kutip tidak.
      • String mentah: r"""return "\n";""". Tidak diperlukan escaping.
      Untuk mempelajari cara menambahkan kode PySpark inline, lihat Menggunakan kode inline.
    • MAIN_JAR_URI: jalur file JAR yang berisi class main, misalnya, gs://mybucket/my_main.jar.
    • CLASS_NAME: nama class yang sepenuhnya memenuhi syarat dalam JAR yang ditetapkan dengan opsi jar_uris, misalnya, com.example.wordcount.
    • URI: jalur file JAR yang berisi class yang ditentukan dalam class main, misalnya, gs://mybucket/mypysparkmain.jar.

    Untuk opsi tambahan yang dapat Anda masukkan di OPTIONS, lihat daftar opsi prosedur.

Menggunakan editor PySpark

Saat membuat prosedur menggunakan editor PySpark, Anda tidak perlu menggunakan pernyataan CREATE PROCEDURE. Sebagai gantinya, tambahkan kode Python langsung di editor Pyspark, lalu simpan atau jalankan kode Anda.

Untuk membuat prosedur tersimpan untuk Spark di editor PySpark, ikuti langkah-langkah berikut:

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Jika Anda ingin langsung mengetik kode PySpark, buka editor PySpark. Untuk membuka editor PySpark, klik menu di samping Create SQL query, lalu pilih Create PySpark Procedure.

  3. Untuk menetapkan opsi, klik More > PySpark Options, lalu lakukan hal berikut:

    1. Tentukan lokasi tempat Anda ingin menjalankan kode PySpark.

    2. Di kolom Connection, tentukan koneksi Spark.

    3. Di bagian Stored procedure invocation, tentukan set data tempat Anda ingin menyimpan prosedur tersimpan sementara yang dihasilkan. Anda dapat menetapkan set data tertentu atau mengizinkan penggunaan set data sementara untuk memanggil kode PySpark.

      Set data sementara dibuat dengan lokasi yang ditentukan pada langkah sebelumnya. Jika nama set data disebutkan, pastikan set data dan koneksi Spark berada di lokasi yang sama.

    4. Di bagian Parameters, tentukan parameter untuk prosedur yang disimpan. Nilai parameter hanya digunakan selama menjalankan kode PySpark dalam sesi, tetapi deklarasinya disimpan dalam prosedur.

    5. Di bagian Opsi lanjutan, tentukan opsi prosedur. Untuk daftar lengkap opsi prosedur, lihat daftar opsi prosedur.

    6. Di bagian Properties, tambahkan pasangan nilai kunci untuk mengonfigurasi tugas. Anda dapat menggunakan salah satu key-value pair dari properti Dataproc Serverless Spark.

    7. Di bagian Service account settings, tentukan akun layanan kustom, CMEK, set data staging, dan folder Cloud Storage staging yang akan digunakan selama menjalankan kode PySpark dalam sesi.

    8. Klik Save.

Menyimpan prosedur tersimpan untuk Spark

Setelah membuat prosedur tersimpan menggunakan editor PySpark, Anda dapat menyimpan prosedur tersebut. Untuk melakukannya, ikuti langkah-langkah berikut:

  1. Di Konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, buat prosedur tersimpan untuk Spark menggunakan Python dengan editor PySpark.

  3. Klik Simpan > Simpan prosedur.

  4. Pada dialog Save stored procedure, sebutkan nama set data tempat Anda ingin menyimpan prosedur dan nama prosedur tersebut.

  5. Klik Save.

    Jika hanya ingin menjalankan kode PySpark, bukan menyimpannya sebagai prosedur tersimpan, Anda dapat mengklik Run, bukan Save.

Menggunakan container kustom

Penampung kustom menyediakan lingkungan runtime untuk proses driver dan eksekutor workload. Untuk menggunakan container kustom, gunakan kode contoh berikut:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Ganti kode berikut:

  • PROJECT_ID: project tempat Anda ingin membuat prosedur tersimpan—misalnya, myproject.
  • DATASET: set data tempat Anda ingin membuat prosedur tersimpan—misalnya, mydataset.
  • PROCEDURE_NAME: nama prosedur tersimpan yang ingin Anda jalankan di BigQuery—misalnya, mysparkprocedure.
  • PROCEDURE_ARGUMENT: parameter untuk memasukkan argumen input.

    Dalam parameter ini, tentukan kolom berikut:

    • ARGUMENT_MODE: mode argumen.

      Nilai valid yang mencakup: IN, OUT, dan INOUT. Secara default, nilainya adalah IN.

    • ARGUMENT_NAME: nama argumen.
    • ARGUMENT_TYPE: jenis argumen.

    Misalnya: myproject.mydataset.mysparkproc(num INT64).

    Untuk mengetahui informasi selengkapnya, lihat meneruskan nilai sebagai parameter IN atau parameter OUT dan INOUT dalam dokumen ini.

  • CONNECTION_PROJECT_ID: project yang berisi koneksi untuk menjalankan prosedur Spark.
  • CONNECTION_REGION: region yang berisi koneksi untuk menjalankan prosedur Spark—misalnya, us.
  • CONNECTION_ID: ID koneksi, misalnya, myconnection.

    Saat Anda melihat detail koneksi di Konsol Google Cloud, ID koneksi adalah nilai di bagian terakhir ID koneksi yang sepenuhnya memenuhi syarat, yang ditampilkan di Connection ID—misalnya projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: versi runtime Spark—misalnya, 1.1.
  • MAIN_PYTHON_FILE_URI: jalur ke file PySpark—misalnya, gs://mybucket/mypysparkmain.py.

    Atau, jika Anda ingin menambahkan isi prosedur yang disimpan dalam pernyataan CREATE PROCEDURE, tambahkan PYSPARK_CODE setelah LANGUAGE PYTHON AS seperti yang ditunjukkan dalam contoh di Gunakan kode inline dalam dokumen ini.

  • PYSPARK_CODE: definisi aplikasi PySpark dalam pernyataan CREATE PROCEDURE jika Anda ingin meneruskan isi prosedur secara inline.

    Nilainya adalah literal string. Jika kode menyertakan tanda kutip dan garis miring terbalik, keduanya harus di-escape atau dinyatakan sebagai string mentah. Misalnya, kode yang menampilkan "\n"; dapat direpresentasikan sebagai salah satu dari berikut:

    • String kutipan: "return \"\\n\";". Tanda kutip dan garis miring terbalik akan di-escape.
    • String yang dikutip tiga kali: """return "\\n";""". Garis miring terbalik akan di-escape, sedangkan tanda kutip tidak.
    • String mentah: r"""return "\n";""". Tidak diperlukan escaping.
    Untuk mempelajari cara menambahkan kode PySpark inline, lihat Menggunakan kode inline.
  • CONTAINER_IMAGE: jalur image di artifacts registry. File ini hanya boleh berisi library yang akan digunakan dalam prosedur Anda. Jika tidak ditentukan, image container default sistem yang terkait dengan versi runtime akan digunakan.

Untuk mengetahui informasi selengkapnya tentang cara mem-build image container kustom dengan Spark, lihat Membangun image container kustom.

Memanggil prosedur tersimpan Spark

Setelah membuat prosedur tersimpan, Anda dapat memanggilnya dengan salah satu opsi berikut:

Konsol

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di panel Explorer, luaskan project Anda dan pilih prosedur tersimpan untuk Spark yang ingin Anda jalankan.

  3. Di jendela Stored procedure info, klik Invoke stored procedure. Atau, Anda dapat meluaskan opsi View actions, lalu mengklik Invoke.

  4. Klik Run.

  5. Di bagian All results, klik View results.

  6. Opsional: Di bagian Query results, ikuti langkah-langkah berikut ini:

    • Jika Anda ingin melihat log driver Spark, klik Execution details.

    • Jika Anda ingin melihat log di Cloud Logging, klik Job information, lalu di kolom Log, klik log

    • Jika Anda ingin mendapatkan endpoint Spark History Server, klik Job information, lalu klik Spark history server.

SQL

Untuk memanggil prosedur tersimpan, gunakan pernyataan CALL PROCEDURE:

  1. Di Konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, masukkan pernyataan berikut:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. Klik Run.

Untuk informasi selengkapnya tentang cara menjalankan kueri, lihat Menjalankan kueri interaktif.

Menggunakan akun layanan khusus

Daripada menggunakan identitas layanan koneksi Spark untuk akses data, Anda dapat menggunakan akun layanan kustom untuk mengakses data dalam kode Spark Anda.

Untuk menggunakan akun layanan kustom, tentukan mode keamanan INVOKER (menggunakan pernyataan EXTERNAL SECURITY INVOKER) saat Anda membuat prosedur yang disimpan Spark, dan tentukan akun layanan saat Anda memanggil prosedur yang disimpan.

Jika ingin mengakses dan menggunakan kode Spark dari Cloud Storage, Anda harus memberikan izin yang diperlukan untuk mengidentifikasi layanan koneksi Spark. Anda perlu memberikan izin IAM storage.objects.get atau peran IAM storage.objectViewer ke akun layanan koneksi.

Secara opsional, Anda dapat memberikan akses akun layanan koneksi ke Dataproc Metastore dan Dataproc Persistent History Server jika Anda telah menentukannya dalam koneksi. Untuk mengetahui informasi selengkapnya, lihat Memberikan akses ke akun layanan.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Secara opsional, Anda dapat menambahkan argumen berikut ke kode sebelumnya:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Ganti kode berikut:

  • CUSTOM_SERVICE_ACCOUNT: Wajib diisi. Akun layanan kustom yang Anda sediakan.
  • BUCKET_NAME: Opsional. Bucket Cloud Storage yang digunakan sebagai sistem file aplikasi Spark default. Jika tidak disediakan, bucket Cloud Storage default akan dibuat di project Anda dan bucket tersebut akan digunakan bersama oleh semua tugas yang berjalan dalam project yang sama.
  • DATASET: Opsional. Set data untuk menyimpan data sementara yang dihasilkan dengan memanggil prosedur. Data tersebut akan dibersihkan setelah pekerjaan selesai. Jika atribut ini tidak diberikan, set data sementara default akan dibuat untuk tugas tersebut.

Akun layanan kustom Anda harus memiliki izin berikut:

  • Untuk membaca dan menulis ke bucket staging yang digunakan sebagai sistem file aplikasi Spark default:

    • Izin storage.objects.* atau peran IAM roles/storage.objectAdmin pada bucket staging yang Anda tentukan.
    • Selain itu, izin storage.buckets.* atau peran IAM roles/storage.Admin pada project jika bucket staging tidak ditentukan.
  • (Opsional) Untuk membaca dan menulis data dari dan ke BigQuery:

    • bigquery.tables.* pada tabel BigQuery.
    • bigquery.readsessions.* di project Anda.
    • Peran IAM roles/bigquery.admin mencakup izin sebelumnya.
  • (Opsional) Untuk membaca dan menulis data dari dan ke Cloud Storage:

    • Izin storage.objects.* atau peran IAM roles/storage.objectAdmin di objek Cloud Storage Anda.
  • (Opsional) Untuk membaca dan menulis ke set data staging yang digunakan untuk parameter INOUT/OUT:

    • Peran IAM bigquery.tables.* atau roles/bigquery.dataEditor pada set data staging yang Anda tentukan.
    • Selain itu, izin bigquery.datasets.create atau peran IAM roles/bigquery.dataEditor pada project jika set data staging tidak ditentukan.

Contoh prosedur tersimpan untuk Spark

Bagian ini menunjukkan contoh pembuatan prosedur tersimpan untuk Apache Spark.

Menggunakan PySpark atau file JAR di Cloud Storage

Contoh berikut menunjukkan cara membuat prosedur tersimpan untuk Spark menggunakan koneksi my-project-id.us.my-connection dan PySpark atau file JAR yang disimpan di bucket Cloud Storage:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java atau Scala

Gunakan main_file_uri untuk membuat prosedur tersimpan:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Gunakan main_class untuk membuat prosedur tersimpan:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Menggunakan kode inline

Contoh berikut menunjukkan cara membuat prosedur tersimpan untuk Spark menggunakan my-project-id.us.my-connection koneksi dan kode PySpark inline:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Meneruskan nilai sebagai parameter input

Contoh berikut menampilkan dua metode untuk meneruskan nilai sebagai parameter input di Python:

Metode 1: Menggunakan variabel lingkungan

Dalam kode PySpark, Anda dapat memperoleh parameter input prosedur tersimpan untuk Spark melalui variabel lingkungan di driver dan eksekutor Spark. Nama variabel lingkungan memiliki format BIGQUERY_PROC_PARAM.PARAMETER_NAME, dengan PARAMETER_NAME sebagai nama parameter input. Misalnya, jika nama parameter input adalah var, nama variabel lingkungan yang sesuai adalah BIGQUERY_PROC_PARAM.var. Parameter input dienkode JSON. Di kode PySpark, Anda bisa mendapatkan nilai parameter input dalam string JSON dari variabel lingkungan dan mendekodenya ke variabel Python.

Contoh berikut menunjukkan cara mendapatkan nilai parameter input jenis INT64 ke dalam kode PySpark Anda:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Metode 2: Menggunakan library bawaan

Dalam kode PySpark, Anda cukup mengimpor library bawaan dan menggunakannya untuk mengisi semua jenis parameter. Untuk meneruskan parameter ke eksekutor, isi parameter di driver Spark sebagai variabel Python dan teruskan nilainya ke eksekutor. Library bawaan mendukung sebagian besar jenis data BigQuery, kecuali INTERVAL, GEOGRAPHY, NUMERIC, dan BIGNUMERIC.

Jenis data BigQuery Jenis data Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Tidak Didukung
BIGNUMERIC Tidak Didukung
INTERVAL Tidak Didukung
GEOGRAPHY Tidak Didukung

Contoh berikut menunjukkan cara mengimpor library bawaan dan menggunakannya untuk mengisi parameter input jenis INT64 dan parameter input jenis ARRAY<STRUCT<a INT64, b STRING>> ke kode PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

Dalam kode Java atau Scala, Anda dapat memperoleh parameter input dari prosedur tersimpan untuk Spark melalui variabel lingkungan di driver dan eksekutor Spark. Nama variabel lingkungan memiliki format BIGQUERY_PROC_PARAM.PARAMETER_NAME, dengan PARAMETER_NAME sebagai nama parameter input. Misalnya, jika nama parameter input adalah var, nama variabel lingkungan yang sesuai adalah BIGQUERY_PROC_PARAM.var. Di kode Java atau Scala, Anda bisa mendapatkan nilai parameter input dari variabel lingkungan.

Contoh berikut menunjukkan cara mendapatkan nilai parameter input dari variabel lingkungan ke dalam kode Scala Anda:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

Contoh berikut menunjukkan cara mendapatkan parameter input dari variabel lingkungan ke dalam kode Java Anda:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Meneruskan nilai sebagai parameter OUT dan INOUT

Parameter output menampilkan nilai dari prosedur Spark, sedangkan parameter INOUT menerima nilai untuk prosedur dan menampilkan nilai dari prosedur. Untuk menggunakan parameter OUT dan INOUT, tambahkan kata kunci OUT atau INOUT sebelum nama parameter saat membuat prosedur Spark. Dalam kode PySpark, Anda menggunakan library bawaan untuk menampilkan nilai sebagai parameter OUT atau INOUT. Sama seperti parameter input, library bawaan mendukung sebagian besar jenis data BigQuery kecuali INTERVAL, GEOGRAPHY, NUMERIC, dan BIGNUMERIC. Jenis nilai TIME dan DATETIME dikonversi ke zona waktu UTC saat ditampilkan sebagai parameter OUT atau INOUT.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Membaca dari tabel Hive Metastore dan menulis hasilnya ke BigQuery

Contoh berikut menunjukkan cara mengubah tabel Hive Metastore dan menulis hasilnya ke BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Melihat filter log

Setelah memanggil prosedur tersimpan untuk Spark, Anda dapat melihat informasi log. Untuk mendapatkan informasi filter Cloud Logging dan endpoint Cluster Histori Spark, gunakan perintah bq show. Informasi filter tersedia di kolom SparkStatistics dari tugas turunan. Untuk mendapatkan filter log, ikuti langkah-langkah berikut:

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, cantumkan tugas turunan dari tugas skrip prosedur tersimpan:

    bq ls -j --parent_job_id=$parent_job_id

    Untuk mempelajari cara mendapatkan ID tugas, lihat Melihat detail tugas.

    Outputnya mirip dengan hal berikut ini:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. Identifikasi jobId untuk prosedur tersimpan Anda dan gunakan perintah bq show untuk melihat detail tugas:

    bq show --format=prettyjson --job $child_job_id

    Salin kolom sparkStatistics karena Anda akan memerlukannya untuk langkah lain.

    Outputnya mirip dengan hal berikut ini:

    {
    "configuration": {...}"statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. Untuk Logging, buat filter log dengan kolom SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    Log ditulis dalam resource yang dimonitor bigquery.googleapis.com/SparkJob. Log diberi label berdasarkan komponen INFO, DRIVER, dan EXECUTOR. Untuk memfilter log dari driver Spark, tambahkan komponen labels.component = "DRIVER" ke filter log. Untuk memfilter log dari eksekutor Spark, tambahkan komponen labels.component = "EXECUTOR" ke filter log.

Menggunakan kunci enkripsi yang dikelola pelanggan

Prosedur BigQuery Spark menggunakan kunci enkripsi yang dikelola pelanggan (Customer-Managed Encryption Key/CMEK) untuk melindungi konten Anda, bersama dengan enkripsi default yang disediakan oleh BigQuery. Untuk menggunakan CMEK dalam prosedur Spark, picu pembuatan akun layanan enkripsi BigQuery dan berikan izin yang diperlukan terlebih dahulu. Prosedur Spark juga mendukung kebijakan organisasi CMEK jika diterapkan ke project Anda.

Jika prosedur yang tersimpan menggunakan mode keamanan INVOKER, CMEK Anda harus ditentukan melalui variabel sistem SQL saat memanggil prosedur. Jika tidak, CMEK Anda dapat ditentukan melalui koneksi yang terkait dengan prosedur yang disimpan.

Untuk menentukan CMEK melalui koneksi saat Anda membuat prosedur yang disimpan di Spark, gunakan kode contoh berikut:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Untuk menentukan CMEK melalui variabel sistem SQL saat memanggil prosedur, gunakan kode contoh berikut:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Menggunakan Kontrol Layanan VPC

Kontrol Layanan VPC dapat digunakan untuk menyiapkan perimeter yang aman untuk mencegah pemindahan data yang tidak sah. Untuk menggunakan Kontrol Layanan VPC dengan prosedur Spark guna mendapatkan keamanan tambahan, buat perimeter layanan terlebih dahulu.

Untuk melindungi tugas prosedur Spark sepenuhnya, tambahkan API berikut ke perimeter layanan:

  • BigQuery API (bigquery.googleapis.com)
  • Cloud Logging API (logging.googleapis.com)
  • Cloud Storage API (storage.googleapis.com), jika Anda menggunakan Cloud Storage
  • Artifact Registry API (artifactregistry.googleapis.com) atau Container Registry API (containerregistry.googleapis.com), jika Anda menggunakan penampung kustom
  • Dataproc Metastore API (metastore.googleapis.com) dan Cloud Run Admin API (run.googleapis.com), jika Anda menggunakan Metastore Dataproc

Tambahkan project kueri prosedur spark ke perimeter. Tambahkan project lain yang menghosting kode atau data Spark Anda ke dalam perimeter.

Praktik terbaik

  • Saat Anda menggunakan koneksi di project untuk pertama kalinya, perlu waktu sekitar satu menit tambahan untuk penyediaan. Untuk menghemat waktu, Anda dapat menggunakan kembali koneksi Spark yang ada saat membuat prosedur tersimpan untuk Spark.

  • Saat Anda membuat prosedur Spark untuk penggunaan produksi, Google merekomendasikan untuk menentukan versi runtime. Untuk daftar versi runtime yang didukung, lihat Versi runtime Dataproc Serverless. Sebaiknya gunakan versi Dukungan Lama (LTS).

  • Saat menentukan container kustom dalam prosedur Spark, sebaiknya gunakan Artifact Registry dan streaming image.

  • Untuk mendapatkan performa yang lebih baik, Anda dapat menentukan properti alokasi resource dalam prosedur Spark. Prosedur yang disimpan Spark mendukung daftar properti alokasi resource yang sama seperti Dataproc Serverless.

Batasan

  • Anda hanya bisa menggunakan protokol endpoint gRPC untuk terhubung ke Dataproc Metastore. Jenis Hive Metastore lainnya belum didukung.
  • Kunci enkripsi yang dikelola pelanggan (CMEK) hanya tersedia saat pelanggan membuat prosedur Spark dengan satu region. Kunci CMEK wilayah global dan kunci CMEK multi-region, misalnya, EU atau US, tidak didukung.
  • Meneruskan parameter output hanya didukung untuk PySpark.
  • Jika set data yang terkait dengan prosedur tersimpan untuk Spark direplikasi ke region tujuan melalui replikasi set data lintas region, prosedur tersimpan hanya dapat dikueri di region tempat kode dibuat.

Kuota dan batas

Untuk mendapatkan informasi tentang kuota dan batas, lihat prosedur tersimpan untuk kuota dan batas Spark.

Langkah selanjutnya