Menggunakan metastore BigQuery dengan prosedur tersimpan Spark
Dokumen ini menjelaskan cara menggunakan prosedur tersimpan Apache Spark dengan metastore BigQuery.
Sebelum memulai
- Aktifkan penagihan untuk project Google Cloud Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada project.
Aktifkan BigQuery dan Dataflow API.
Opsional: Pelajari lebih lanjut hal berikut:
- Pahami cara kerja metastore BigQuery dan alasan Anda harus menggunakannya.
- Pelajari cara kerja prosedur tersimpan BigQuery Spark dan selesaikan tugas sebelum memulai.
Peran yang diperlukan
Untuk menggunakan prosedur tersimpan Spark, tinjau peran yang diperlukan untuk prosedur tersimpan dan berikan peran yang diperlukan.
Untuk mendapatkan izin yang diperlukan guna menggunakan Spark dan prosedur tersimpan dengan metastore BigQuery sebagai penyimpanan metadata, minta administrator untuk memberi Anda peran IAM berikut:
-
Buat tabel metastore BigQuery di Spark:
-
BigQuery Data Editor (
roles/bigquery.dataEditor
) di akun layanan Spark Connection dalam project -
Storage Object Admin (
roles/storage.objectAdmin
) di akun layanan Spark Connection dalam project
-
BigQuery Data Editor (
-
Buat kueri tabel metastore BigQuery di BigQuery:
-
BigQuery Data Viewer (
roles/bigquery.dataViewer
) di project -
Pengguna BigQuery (
roles/bigquery.user
) di project -
Storage Object Viewer (
roles/storage.objectViewer
) di project
-
BigQuery Data Viewer (
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.
Membuat dan menjalankan prosedur tersimpan
Contoh berikut menunjukkan cara membuat dan menjalankan prosedur tersimpan dengan metastore BigQuery.
Buka halaman BigQuery.
Di editor kueri, tambahkan kode contoh berikut untuk pernyataan
CREATE PROCEDURE
.CREATE OR REPLACE PROCEDURE `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`() WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK', runtime_version='1.1', properties=[("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY"), ("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION"), ("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID"), ("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"), ("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")], jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"]) LANGUAGE python AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("BigQuery metastore Iceberg") \ .getOrCreate() spark.sql("USE CATALOG_NAME;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'") spark.sql("DESCRIBE TABLE_NAME;") spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");") spark.sql("SELECT * from TABLE_NAME;") spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE TABLE_NAME;") """; CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();
Ganti kode berikut:
PROJECT_ID
: ID project Google Cloud Anda.BQ_DATASET_ID
: ID set data di BigQuery yang berisi prosedur.PROCEDURE_NAME
: nama prosedur yang Anda buat atau ganti.REGION
: lokasi koneksi Spark Anda.LOCATION
: lokasi resource BigQuery Anda.SPARK_CONNECTION_ID
: ID koneksi Spark Anda.CATALOG_NAME
: nama katalog yang Anda gunakan.WAREHOUSE_DIRECTORY
: URI folder Cloud Storage yang berisi data warehouse Anda.NAMESPACE_NAME
: namespace yang Anda gunakan.
Langkah selanjutnya
- Siapkan fitur metastore BigQuery opsional.
- Melihat dan membuat kueri tabel dari Spark di konsol BigQuery.