Bekerja dengan prosedur tersimpan untuk Apache Spark

Dokumen ini ditujukan untuk data engineer, data scientist, dan analis data dalam membuat dan memanggil prosedur tersimpan untuk Spark di BigQuery.

Dengan BigQuery, Anda dapat membuat dan menjalankan prosedur tersimpan Spark yang ditulis dalam Python, Java, dan Scala. Anda kemudian dapat menjalankan prosedur tersimpan ini di BigQuery menggunakan kueri GoogleSQL, mirip seperti menjalankan prosedur tersimpan SQL.

Sebelum memulai

Untuk membuat prosedur tersimpan Spark, minta administrator Anda untuk membuat koneksi Spark dan membagikannya kepada Anda. Administrator juga harus memberikan izin Identity and Access Management (IAM) yang diperlukan untuk akun layanan yang terkait dengan koneksi.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk melakukan tugas dalam dokumen ini, minta administrator Anda untuk memberikan peran IAM berikut:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran yang telah ditetapkan ini berisi izin yang diperlukan untuk melakukan tugas dalam dokumen ini. Untuk melihat izin yang benar-benar diperlukan, luaskan bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk melakukan tugas dalam dokumen ini:

  • Buat koneksi:
    • bigquery.connections.create
    • bigquery.connections.list
  • Buat prosedur tersimpan untuk Spark:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Panggil prosedur tersimpan untuk Spark:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Pertimbangan lokasi

Anda harus membuat prosedur tersimpan untuk Spark di lokasi yang sama dengan koneksi Anda karena prosedur tersimpan berjalan di lokasi yang sama dengan koneksi Anda. Misalnya, untuk membuat prosedur tersimpan di multi-region AS, gunakan koneksi yang berada di multi-region AS.

Harga

  • Biaya untuk menjalankan prosedur Spark di BigQuery mirip dengan biaya untuk menjalankan prosedur Spark di Dataproc Serverless. Untuk mengetahui informasi selengkapnya, lihat Harga Dataproc Serverless.

  • Prosedur tersimpan Spark dapat digunakan dengan model harga on demand serta dengan edisi BigQuery. Prosedur Spark ditagih menggunakan model bayar sesuai penggunaan edisi BigQuery Enterprise dalam semua kasus, terlepas dari model harga komputasi yang digunakan dalam project Anda.

  • Prosedur tersimpan Spark untuk BigQuery tidak mendukung penggunaan pemesanan atau komitmen. Pemesanan dan komitmen yang ada akan terus digunakan untuk kueri dan prosedur lain yang didukung. Biaya penggunaan prosedur tersimpan Spark ditambahkan ke tagihan Anda di edisi Enterprise - biaya bayar sesuai penggunaan. Diskon organisasi Anda akan diterapkan, jika berlaku.

  • Meskipun prosedur tersimpan Spark menggunakan mesin eksekusi Spark, Anda tidak akan melihat tagihan terpisah untuk eksekusi Spark. Seperti yang telah disebutkan, biaya yang sesuai dilaporkan sebagai SKU bayar sesuai penggunaan edisi BigQuery Enterprise.

  • Prosedur tersimpan Spark tidak menawarkan paket gratis.

Membuat prosedur tersimpan untuk Spark

Anda harus membuat prosedur tersimpan di lokasi yang sama dengan koneksi yang Anda gunakan.

Jika isi prosedur tersimpan lebih dari 1 MB, sebaiknya Anda menempatkan prosedur tersimpan dalam file di bucket Cloud Storage dan bukan dengan kode inline. BigQuery memberikan dua metode untuk membuat prosedur tersimpan Spark menggunakan Python:

Menggunakan editor kueri SQL

Untuk membuat prosedur tersimpan Spark di editor kueri SQL, ikuti langkah-langkah berikut:

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, tambahkan kode contoh untuk pernyataan CREATE PROCEDURE yang muncul.

    Atau, di panel Explorer, klik koneksi dalam project yang Anda gunakan untuk membuat aset koneksi. Kemudian, untuk membuat prosedur tersimpan untuk Spark, klik Buat prosedur tersimpan.

    Python

    Untuk membuat prosedur tersimpan untuk Spark di Python, gunakan kode contoh berikut:

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java atau Scala

    Untuk membuat prosedur tersimpan Spark di Java atau Scala dengan opsi main_file_uri, gunakan kode contoh berikut:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Untuk membuat prosedur tersimpan Spark di Java atau Scala dengan opsi main_class dan jar_uris, gunakan kode contoh berikut:

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Ganti kode berikut:

    • PROJECT_ID: project tempat Anda ingin membuat prosedur tersimpan—misalnya, myproject.
    • DATASET: set data tempat Anda ingin membuat prosedur tersimpan—misalnya, mydataset.
    • PROCEDURE_NAME: nama prosedur tersimpan yang ingin Anda jalankan di BigQuery—misalnya, mysparkprocedure.
    • PROCEDURE_ARGUMENT: parameter untuk memasukkan argumen input.

      Dalam parameter ini, tentukan kolom berikut:

      • ARGUMENT_MODE: mode argumen.

        Nilai valid yang mencakup: IN, OUT, dan INOUT. Secara default, nilainya adalah IN.

      • ARGUMENT_NAME: nama argumen.
      • ARGUMENT_TYPE: jenis argumen.

      Misalnya: myproject.mydataset.mysparkproc(num INT64).

      Untuk mengetahui informasi selengkapnya, lihat meneruskan nilai sebagai parameter IN atau parameter OUT dan INOUT dalam dokumen ini.

    • CONNECTION_PROJECT_ID: project yang berisi koneksi untuk menjalankan prosedur Spark.
    • CONNECTION_REGION: region yang berisi koneksi untuk menjalankan prosedur Spark—misalnya, us.
    • CONNECTION_ID: ID koneksi—misalnya, myconnection.

      Saat Anda melihat detail koneksi di konsol Google Cloud, ID koneksi adalah nilai di bagian terakhir dari ID koneksi yang sepenuhnya memenuhi syarat yang ditampilkan di ID Koneksi—misalnya, projects/myproject/locations/connection_location/connections/myconnection.

    • RUNTIME_VERSION: versi runtime Spark—misalnya, 1.1.
    • MAIN_PYTHON_FILE_URI: jalur ke file PySpark—misalnya, gs://mybucket/mypysparkmain.py.

      Atau, jika Anda ingin menambahkan isi prosedur tersimpan dalam pernyataan CREATE PROCEDURE, tambahkan PYSPARK_CODE setelah LANGUAGE PYTHON AS seperti yang ditunjukkan dalam contoh di Menggunakan kode inline dalam dokumen ini.

    • PYSPARK_CODE: definisi aplikasi PySpark dalam pernyataan CREATE PROCEDURE jika Anda ingin meneruskan isi prosedur secara inline.

      Nilainya adalah literal string. Jika kode menyertakan tanda kutip dan garis miring terbalik, keduanya harus di-escape atau direpresentasikan sebagai string mentah. Misalnya, kode yang menampilkan "\n"; dapat direpresentasikan sebagai salah satu dari string berikut:

      • String kutipan: "return \"\\n\";". Tanda petik dan garis miring terbalik akan di-escape.
      • String yang dikutip tiga kali: """return "\\n";""". Garis miring terbalik akan di-escape, sedangkan tanda kutip tidak.
      • String mentah: r"""return "\n";""". Tidak diperlukan escaping.
      Untuk mempelajari cara menambahkan kode PySpark inline, lihat Menggunakan kode inline.
    • MAIN_JAR_URI: jalur file JAR yang berisi class main, misalnya, gs://mybucket/my_main.jar.
    • CLASS_NAME: nama class yang sepenuhnya memenuhi syarat dalam JAR yang ditetapkan dengan opsi jar_uris, misalnya, com.example.wordcount.
    • URI: jalur file JAR yang berisi class yang ditentukan dalam class main, misalnya, gs://mybucket/mypysparkmain.jar.

    Untuk opsi tambahan yang dapat Anda masukkan di OPTIONS, lihat daftar opsi prosedur.

Menggunakan editor PySpark

Saat membuat prosedur menggunakan editor PySpark, Anda tidak perlu menggunakan pernyataan CREATE PROCEDURE. Sebagai gantinya, tambahkan kode Python Anda langsung di editor Pyspark dan simpan atau jalankan kode Anda.

Untuk membuat prosedur tersimpan untuk Spark di editor PySpark, ikuti langkah-langkah berikut:

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Jika Anda ingin mengetik kode PySpark secara langsung, buka editor PySpark. Untuk membuka editor PySpark, klik menu di samping Create SQL query, lalu pilih Create PySpark Procedure.

  3. Untuk menetapkan opsi, klik More > PySpark Options, lalu lakukan hal berikut:

    1. Tentukan lokasi tempat Anda ingin menjalankan kode PySpark.

    2. Di kolom Connection, tentukan koneksi Spark.

    3. Di bagian Stored procedure invocation, tentukan set data tempat Anda ingin menyimpan prosedur tersimpan sementara yang dihasilkan. Anda dapat menetapkan set data tertentu atau mengizinkan penggunaan set data sementara untuk memanggil kode PySpark.

      Set data sementara dibuat dengan lokasi yang ditentukan pada langkah sebelumnya. Jika nama set data disebutkan, pastikan set data dan koneksi Spark berada di lokasi yang sama.

    4. Di bagian Parameter, tentukan parameter untuk prosedur tersimpan. Nilai parameter hanya digunakan selama menjalankan kode PySpark dalam sesi, tetapi deklarasinya disimpan dalam prosedur.

    5. Di bagian Opsi lanjutan, tentukan opsi prosedur. Untuk daftar lengkap opsi prosedur, lihat daftar opsi prosedur.

    6. Di bagian Properties, tambahkan pasangan nilai kunci untuk mengonfigurasi tugas. Anda dapat menggunakan salah satu pasangan nilai kunci dari properti Dataproc Serverless Spark.

    7. Di Setelan akun layanan, tentukan akun layanan kustom,CMEK, set data staging, dan folder Cloud Storage staging yang akan digunakan selama pengoperasian kode PySpark dalam sesi.

    8. Klik Simpan.

Menyimpan prosedur tersimpan untuk Spark

Setelah membuat prosedur tersimpan menggunakan editor PySpark, Anda dapat menyimpan prosedur tersebut. Untuk melakukannya, ikuti langkah-langkah berikut:

  1. Di Konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, buat prosedur tersimpan untuk Spark menggunakan Python dengan editor PySpark.

  3. Klik Simpan > Simpan prosedur.

  4. Pada dialog Save stored procedure, sebutkan nama set data tempat Anda ingin menyimpan prosedur dan nama prosedur tersebut.

  5. Klik Simpan.

    Jika hanya ingin menjalankan kode PySpark, bukan menyimpannya sebagai prosedur tersimpan, Anda dapat mengklik Run, bukan Save.

Menggunakan container kustom

Container kustom menyediakan lingkungan runtime untuk proses driver dan eksekutor workload. Untuk menggunakan penampung kustom, gunakan kode contoh berikut:

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Ganti kode berikut:

  • PROJECT_ID: project tempat Anda ingin membuat prosedur tersimpan—misalnya, myproject.
  • DATASET: set data tempat Anda ingin membuat prosedur tersimpan—misalnya, mydataset.
  • PROCEDURE_NAME: nama prosedur tersimpan yang ingin Anda jalankan di BigQuery—misalnya, mysparkprocedure.
  • PROCEDURE_ARGUMENT: parameter untuk memasukkan argumen input.

    Dalam parameter ini, tentukan kolom berikut:

    • ARGUMENT_MODE: mode argumen.

      Nilai valid yang mencakup: IN, OUT, dan INOUT. Secara default, nilainya adalah IN.

    • ARGUMENT_NAME: nama argumen.
    • ARGUMENT_TYPE: jenis argumen.

    Misalnya: myproject.mydataset.mysparkproc(num INT64).

    Untuk mengetahui informasi selengkapnya, lihat meneruskan nilai sebagai parameter IN atau parameter OUT dan INOUT dalam dokumen ini.

  • CONNECTION_PROJECT_ID: project yang berisi koneksi untuk menjalankan prosedur Spark.
  • CONNECTION_REGION: region yang berisi koneksi untuk menjalankan prosedur Spark—misalnya, us.
  • CONNECTION_ID: ID koneksi, misalnya, myconnection.

    Saat Anda melihat detail koneksi di konsol Google Cloud, ID koneksi adalah nilai di bagian terakhir dari ID koneksi yang sepenuhnya memenuhi syarat yang ditampilkan di ID Koneksi—misalnya, projects/myproject/locations/connection_location/connections/myconnection.

  • RUNTIME_VERSION: versi runtime Spark—misalnya, 1.1.
  • MAIN_PYTHON_FILE_URI: jalur ke file PySpark—misalnya, gs://mybucket/mypysparkmain.py.

    Atau, jika Anda ingin menambahkan isi prosedur tersimpan dalam pernyataan CREATE PROCEDURE, tambahkan PYSPARK_CODE setelah LANGUAGE PYTHON AS seperti yang ditunjukkan dalam contoh di Menggunakan kode inline dalam dokumen ini.

  • PYSPARK_CODE: definisi aplikasi PySpark dalam pernyataan CREATE PROCEDURE jika Anda ingin meneruskan isi prosedur secara inline.

    Nilainya adalah literal string. Jika kode menyertakan tanda kutip dan garis miring terbalik, keduanya harus di-escape atau direpresentasikan sebagai string mentah. Misalnya, kode yang menampilkan "\n"; dapat direpresentasikan sebagai salah satu dari string berikut:

    • String kutipan: "return \"\\n\";". Tanda petik dan garis miring terbalik akan di-escape.
    • String yang dikutip tiga kali: """return "\\n";""". Garis miring terbalik akan di-escape, sedangkan tanda kutip tidak.
    • String mentah: r"""return "\n";""". Tidak diperlukan escaping.
    Untuk mempelajari cara menambahkan kode PySpark inline, lihat Menggunakan kode inline.
  • CONTAINER_IMAGE: jalur gambar di registry artefak. File ini hanya boleh berisi library yang akan digunakan dalam prosedur Anda. Jika tidak ditentukan, image penampung default sistem yang terkait dengan versi runtime akan digunakan.

Untuk mengetahui informasi selengkapnya tentang cara mem-build image container kustom dengan Spark, lihat Mem-build image container kustom.

Memanggil prosedur tersimpan Spark

Setelah membuat prosedur tersimpan, Anda dapat memanggilnya dengan salah satu opsi berikut:

Konsol

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di panel Explorer, luaskan project Anda dan pilih prosedur tersimpan untuk Spark yang ingin Anda jalankan.

  3. Di jendela Stored procedure info, klik Invoke stored procedure. Atau, Anda dapat meluaskan opsi View actions, lalu mengklik Invoke.

  4. Klik Run.

  5. Di bagian All results, klik View results.

  6. Opsional: Di bagian Query results, ikuti langkah-langkah berikut ini:

    • Jika Anda ingin melihat log driver Spark, klik Execution details.

    • Jika Anda ingin melihat log di Cloud Logging, klik Job information, lalu di kolom Log, klik log

    • Jika Anda ingin mendapatkan endpoint Spark History Server, klik Job information, lalu klik Spark history server.

SQL

Untuk memanggil prosedur tersimpan, gunakan pernyataan CALL PROCEDURE:

  1. Di Konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, masukkan pernyataan berikut:

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. Klik Run.

Untuk informasi selengkapnya tentang cara menjalankan kueri, lihat Menjalankan kueri interaktif.

Menggunakan akun layanan khusus

Alih-alih menggunakan identitas layanan koneksi Spark untuk akses data, Anda dapat menggunakan akun layanan kustom untuk mengakses data dalam kode Spark.

Untuk menggunakan akun layanan kustom, tentukan mode keamanan INVOKER (menggunakan pernyataan EXTERNAL SECURITY INVOKER) saat Anda membuat prosedur tersimpan Spark, dan tentukan akun layanan saat Anda memanggil prosedur tersimpan.

Jika ingin mengakses dan menggunakan kode Spark dari Cloud Storage, Anda harus memberikan izin yang diperlukan ke identitas layanan koneksi Spark. Anda harus memberikan izin IAM storage.objects.get atau peran IAM storage.objectViewer ke akun layanan koneksi.

Secara opsional, Anda dapat memberikan akses akun layanan koneksi ke Dataproc Metastore dan Dataproc Persistent History Server jika telah menentukannya dalam koneksi. Untuk mengetahui informasi selengkapnya, lihat Memberikan akses ke akun layanan.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Secara opsional, Anda dapat menambahkan argumen berikut ke kode sebelumnya:

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Ganti kode berikut:

  • CUSTOM_SERVICE_ACCOUNT: Wajib diisi. Akun layanan kustom yang disediakan oleh Anda.
  • BUCKET_NAME: Opsional. Bucket Cloud Storage yang digunakan sebagai sistem file aplikasi Spark default. Jika tidak disediakan, bucket Cloud Storage default akan dibuat di project Anda dan bucket tersebut akan dibagikan oleh semua tugas yang berjalan dalam project yang sama.
  • DATASET: Opsional. Set data untuk menyimpan data sementara yang dihasilkan dengan memanggil prosedur. Data akan dihapus setelah tugas selesai. Jika tidak disediakan, set data sementara default akan dibuat untuk tugas.

Akun layanan kustom Anda harus memiliki izin berikut:

  • Untuk membaca dan menulis ke bucket staging yang digunakan sebagai sistem file aplikasi Spark default:

    • Izin storage.objects.* atau peran IAM roles/storage.objectAdmin di bucket staging yang Anda tentukan.
    • Selain itu, izin storage.buckets.* atau peran IAM roles/storage.Admin di project jika bucket staging tidak ditentukan.
  • (Opsional) Untuk membaca dan menulis data dari dan ke BigQuery:

    • bigquery.tables.* di tabel BigQuery Anda.
    • bigquery.readsessions.* di project Anda.
    • Peran IAM roles/bigquery.admin mencakup izin sebelumnya.
  • (Opsional) Untuk membaca dan menulis data dari dan ke Cloud Storage:

    • Izin storage.objects.* atau peran IAM roles/storage.objectAdmin pada objek Cloud Storage Anda.
  • (Opsional) Untuk membaca dan menulis ke set data staging yang digunakan untuk parameter INOUT/OUT:

    • Peran IAM bigquery.tables.* atau roles/bigquery.dataEditor di set data staging yang Anda tentukan.
    • Selain itu, izin bigquery.datasets.create atau peran IAM roles/bigquery.dataEditor di project jika set data staging tidak ditentukan.

Contoh prosedur tersimpan untuk Spark

Bagian ini menunjukkan contoh pembuatan prosedur tersimpan untuk Apache Spark.

Menggunakan PySpark atau file JAR di Cloud Storage

Contoh berikut menunjukkan cara membuat prosedur tersimpan untuk Spark menggunakan koneksi my-project-id.us.my-connection dan PySpark atau file JAR yang disimpan di bucket Cloud Storage:

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java atau Scala

Gunakan main_file_uri untuk membuat prosedur tersimpan:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Gunakan main_class untuk membuat prosedur tersimpan:

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Menggunakan kode inline

Contoh berikut menunjukkan cara membuat prosedur tersimpan untuk Spark menggunakan my-project-id.us.my-connection koneksi dan kode PySpark inline:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Meneruskan nilai sebagai parameter input

Contoh berikut menampilkan dua metode untuk meneruskan nilai sebagai parameter input di Python:

Metode 1: Menggunakan variabel lingkungan

Dalam kode PySpark, Anda dapat memperoleh parameter input prosedur tersimpan untuk Spark melalui variabel lingkungan di driver dan eksekutor Spark. Nama variabel lingkungan memiliki format BIGQUERY_PROC_PARAM.PARAMETER_NAME, dengan PARAMETER_NAME sebagai nama parameter input. Misalnya, jika nama parameter input adalah var, nama variabel lingkungan yang sesuai adalah BIGQUERY_PROC_PARAM.var. Parameter input dienkode JSON. Di kode PySpark, Anda bisa mendapatkan nilai parameter input dalam string JSON dari variabel lingkungan dan mendekodenya ke variabel Python.

Contoh berikut menunjukkan cara mendapatkan nilai parameter input jenis INT64 ke dalam kode PySpark Anda:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Metode 2: Menggunakan library bawaan

Dalam kode PySpark, Anda cukup mengimpor library bawaan dan menggunakannya untuk mengisi semua jenis parameter. Untuk meneruskan parameter ke eksekutor, isi parameter di driver Spark sebagai variabel Python dan teruskan nilainya ke eksekutor. Library bawaan mendukung sebagian besar jenis data BigQuery, kecuali INTERVAL, GEOGRAPHY, NUMERIC, dan BIGNUMERIC.

Jenis data BigQuery Jenis data Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Tidak Didukung
BIGNUMERIC Tidak Didukung
INTERVAL Tidak Didukung
GEOGRAPHY Tidak Didukung

Contoh berikut menunjukkan cara mengimpor library bawaan dan menggunakannya untuk mengisi parameter input jenis INT64 dan parameter input jenis ARRAY<STRUCT<a INT64, b STRING>> ke kode PySpark:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

Dalam kode Java atau Scala, Anda dapat memperoleh parameter input dari prosedur tersimpan untuk Spark melalui variabel lingkungan di driver dan eksekutor Spark. Nama variabel lingkungan memiliki format BIGQUERY_PROC_PARAM.PARAMETER_NAME, dengan PARAMETER_NAME sebagai nama parameter input. Misalnya, jika nama parameter input adalah var, nama variabel lingkungan yang sesuai adalah BIGQUERY_PROC_PARAM.var. Di kode Java atau Scala, Anda bisa mendapatkan nilai parameter input dari variabel lingkungan.

Contoh berikut menunjukkan cara mendapatkan nilai parameter input dari variabel lingkungan ke dalam kode Scala Anda:

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

Contoh berikut menunjukkan cara mendapatkan parameter input dari variabel lingkungan ke dalam kode Java Anda:

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Meneruskan nilai sebagai parameter OUT dan INOUT

Parameter output menampilkan nilai dari prosedur Spark, sedangkan parameter INOUT menerima nilai untuk prosedur dan menampilkan nilai dari prosedur. Untuk menggunakan parameter OUT dan INOUT, tambahkan kata kunci OUT atau INOUT sebelum nama parameter saat membuat prosedur Spark. Dalam kode PySpark, Anda menggunakan library bawaan untuk menampilkan nilai sebagai parameter OUT atau INOUT. Sama seperti parameter input, library bawaan mendukung sebagian besar jenis data BigQuery kecuali INTERVAL, GEOGRAPHY, NUMERIC, dan BIGNUMERIC. Jenis nilai TIME dan DATETIME dikonversi ke zona waktu UTC saat ditampilkan sebagai parameter OUT atau INOUT.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Membaca dari tabel Hive Metastore dan menulis hasilnya ke BigQuery

Contoh berikut menunjukkan cara mengubah tabel Hive Metastore dan menulis hasilnya ke BigQuery:

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Melihat filter log

Setelah memanggil prosedur tersimpan untuk Spark, Anda dapat melihat informasi log. Untuk mendapatkan informasi filter Cloud Logging dan endpoint Cluster Histori Spark, gunakan perintah bq show. Informasi filter tersedia di kolom SparkStatistics dari tugas turunan. Untuk mendapatkan filter log, ikuti langkah-langkah berikut:

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, cantumkan tugas turunan dari tugas skrip prosedur tersimpan:

    bq ls -j --parent_job_id=$parent_job_id

    Untuk mempelajari cara mendapatkan ID tugas, lihat Melihat detail tugas.

    Outputnya mirip seperti ini:

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. Identifikasi jobId untuk prosedur tersimpan Anda dan gunakan perintah bq show untuk melihat detail tugas:

    bq show --format=prettyjson --job $child_job_id

    Salin kolom sparkStatistics karena Anda akan memerlukannya untuk langkah lain.

    Outputnya mirip seperti ini:

    {
    "configuration": {...}"statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. Untuk Logging, buat filter log dengan kolom SparkStatistics:

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    Log ditulis dalam resource yang dimonitor bigquery.googleapis.com/SparkJob. Log diberi label berdasarkan komponen INFO, DRIVER, dan EXECUTOR. Untuk memfilter log dari driver Spark, tambahkan komponen labels.component = "DRIVER" ke filter log. Untuk memfilter log dari eksekutor Spark, tambahkan komponen labels.component = "EXECUTOR" ke filter log.

Menggunakan kunci enkripsi yang dikelola pelanggan

Prosedur BigQuery Spark menggunakan kunci enkripsi yang dikelola pelanggan (CMEK) untuk melindungi konten Anda, bersama dengan enkripsi default yang disediakan oleh BigQuery. Untuk menggunakan CMEK dalam prosedur Spark, picu pembuatan akun layanan enkripsi BigQuery dan berikan izin yang diperlukan terlebih dahulu. Prosedur Spark juga mendukung kebijakan organisasi CMEK jika diterapkan ke project Anda.

Jika prosedur tersimpan Anda menggunakan mode keamanan INVOKER, CMEK Anda harus ditentukan melalui variabel sistem SQL saat memanggil prosedur. Jika tidak, CMEK Anda dapat ditentukan melalui koneksi yang terkait dengan prosedur tersimpan.

Untuk menentukan CMEK melalui koneksi saat Anda membuat prosedur tersimpan Spark, gunakan kode contoh berikut:

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Untuk menentukan CMEK melalui variabel sistem SQL saat memanggil prosedur, gunakan kode contoh berikut:

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Menggunakan Kontrol Layanan VPC

Kontrol Layanan VPC memungkinkan Anda menyiapkan perimeter yang aman untuk mencegah pemindahan data yang tidak sah. Agar dapat menggunakan Kontrol Layanan VPC dengan prosedur Spark untuk keamanan tambahan, buat perimeter layanan terlebih dahulu.

Untuk sepenuhnya melindungi tugas prosedur Spark Anda, tambahkan API berikut ke perimeter layanan:

  • BigQuery API (bigquery.googleapis.com)
  • Cloud Logging API (logging.googleapis.com)
  • Cloud Storage API (storage.googleapis.com), jika Anda menggunakan Cloud Storage
  • Artifact Registry API (artifactregistry.googleapis.com) atau Container Registry API (containerregistry.googleapis.com), jika Anda menggunakan penampung kustom
  • Dataproc Metastore API (metastore.googleapis.com) dan Cloud Run Admin API (run.googleapis.com), jika Anda menggunakan Dataproc Metastore

Tambahkan project kueri prosedur spark ke dalam perimeter. Tambahkan project lain yang menghosting kode atau data Spark Anda ke perimeter.

Praktik terbaik

  • Saat Anda menggunakan koneksi dalam project untuk pertama kalinya, penyediaan memerlukan waktu satu menit lebih lama. Untuk menghemat waktu, Anda dapat menggunakan kembali koneksi Spark yang ada saat membuat prosedur tersimpan untuk Spark.

  • Saat Anda membuat prosedur Spark untuk penggunaan produksi, Google merekomendasikan untuk menentukan versi runtime. Untuk daftar versi runtime yang didukung, lihat Versi runtime Dataproc Serverless. Sebaiknya gunakan versi Dukungan Jangka Panjang (LTS).

  • Saat Anda menentukan penampung kustom dalam prosedur Spark, sebaiknya gunakan Artifact Registry dan streaming image.

  • Untuk performa yang lebih baik, Anda dapat menentukan properti alokasi resource dalam prosedur Spark. Prosedur tersimpan Spark mendukung daftar properti alokasi resource yang sama dengan Dataproc Serverless.

Batasan

  • Anda hanya bisa menggunakan protokol endpoint gRPC untuk terhubung ke Dataproc Metastore. Jenis Hive Metastore lainnya belum didukung.
  • Kunci enkripsi yang dikelola pelanggan (CMEK) hanya tersedia saat pelanggan membuat prosedur Spark satu region. Kunci CMEK region global dan kunci CMEK multi-region, misalnya, EU atau US, tidak didukung.
  • Meneruskan parameter output hanya didukung untuk PySpark.
  • Jika set data yang terkait dengan prosedur tersimpan untuk Spark direplikasi ke region tujuan melalui replikasi set data lintas region, prosedur tersimpan hanya dapat dikueri di region tempat kode dibuat.
  • Spark tidak mendukung akses ke endpoint HTTP di jaringan Kontrol Layanan VPC pribadi Anda.

Kuota dan batas

Untuk mendapatkan informasi tentang kuota dan batas, lihat prosedur tersimpan untuk kuota dan batas Spark.

Langkah berikutnya