Menggunakan metastore BigQuery dengan Spark di BigQuery Studio
Dokumen ini menjelaskan cara menggunakan metastore BigQuery dengan Spark di BigQuery Studio.
Anda dapat menggunakan Spark di BigQuery Studio untuk membuat tabel Iceberg dengan Apache Spark di BigQuery Studio. Setelah membuat tabel, Anda dapat membuat kueri data dari Spark. Anda juga dapat membuat kueri data yang sama dari konsol BigQuery menggunakan SQL.
Sebelum memulai
- Minta akses ke Spark di BigQuery Studio melalui formulir pendaftaran berikut.
- Aktifkan penagihan untuk project Google Cloud Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada project.
Aktifkan BigQuery dan Dataflow API.
Opsional: Pahami cara kerja metastore BigQuery dan alasan Anda harus menggunakannya.
Peran yang diperlukan
Untuk mendapatkan izin yang diperlukan untuk menggunakan notebook Spark di BigQuery Studio, minta administrator Anda untuk memberi Anda peran IAM berikut:
-
Membuat tabel metastore BigQuery Studio di Spark:
BigQuery Data Editor (
roles/bigquery.dataEditor
) di project -
Buat sesi Spark dari tabel metastore notebook di Spark:
Dataproc Worker (
roles/dataproc.serverlessEditor
) di akun pengguna
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.
Menghubungkan dengan notebook
Contoh berikut menunjukkan cara mengonfigurasi notebook Spark untuk berinteraksi dengan tabel Iceberg yang disimpan di metastore BigQuery.
Dalam contoh ini, Anda menyiapkan sesi Spark, membuat namespace dan tabel, menambahkan beberapa data ke tabel, lalu membuat kueri data di BigQuery Studio.
Di notebook Apache Spark, sertakan impor Apache Spark yang diperlukan:
from dataproc_spark_session.session.spark.connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session from pyspark.sql import SparkSession
Tentukan katalog, namespace, dan direktori gudang.
catalog = "CATALOG_NAME" namespace = "NAMESPACE_NAME" warehouse_dir = "gs://WAREHOUSE_DIRECTORY"
Ganti kode berikut:
CATALOG_NAME
: nama katalog untuk mereferensikan tabel Spark Anda.NAMESPACE_NAME
: label namespace untuk mereferensikan tabel Spark Anda.WAREHOUSE_DIRECTORY
: URI folder Cloud Storage tempat data warehouse Anda disimpan.
Lakukan inisialisasi sesi Spark.
session.environment_config.execution_config.network_uri = NETWORK_NAME session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_project"] = "PROJECT_ID" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.gcp_location"] = "LOCATION" session.runtime_config.properties[f"spark.sql.catalog.CATALOG_NAME.warehouse"] = warehouse_dir spark = ( DataprocSparkSession.builder .appName("BigQuery metastore Iceberg table example") .dataprocConfig(session) .getOrCreate())
Ganti kode berikut:
NETWORK_NAME
: nama atau URI jaringan yang menjalankan kode Spark. Jika tidak ditentukan, jaringandefault
akan digunakan.PROJECT_ID
: ID project Google Cloud yang menjalankan kode Spark.LOCATION
: lokasi tempat menjalankan tugas Spark.
Buat katalog dan namespace.
spark.sql(f"USE `CATALOG_NAME`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;") spark.sql(f"USE `NAMESPACE_NAME`;")
Membuat tabel
spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME ;")
Ganti kode berikut:
TABLE_NAME
: nama untuk tabel Iceberg Anda.
Menjalankan Bahasa Manipulasi Data (DML) dari Spark.
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");") df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
Menjalankan Bahasa Definisi Data (DDL) dari Spark.
spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);") spark.sql("DESCRIBE TABLE_NAME ;")
Menyisipkan data ke dalam tabel.
spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"It's a sunny day!\", 83);")
Buat kueri tabel dari Spark.
df = spark.sql("SELECT * from TABLE_NAME ;") df.show()
Buat kueri tabel dari konsol Google Cloud dalam set data baru.
SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100