Menggunakan metastore BigQuery dengan prosedur tersimpan Spark

Dokumen ini menjelaskan cara menggunakan prosedur tersimpan Apache Spark dengan metastore BigQuery.

Sebelum memulai

  1. Aktifkan penagihan untuk project Google Cloud Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada project.
  2. Aktifkan BigQuery dan Dataflow API.

    Aktifkan API

  3. Opsional: Pelajari lebih lanjut hal berikut:

Peran yang diperlukan

Untuk menggunakan prosedur tersimpan Spark, tinjau peran yang diperlukan untuk prosedur tersimpan dan berikan peran yang diperlukan.

Untuk mendapatkan izin yang diperlukan guna menggunakan Spark dan prosedur tersimpan dengan metastore BigQuery sebagai penyimpanan metadata, minta administrator untuk memberi Anda peran IAM berikut:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Membuat dan menjalankan prosedur tersimpan

Contoh berikut menunjukkan cara membuat dan menjalankan prosedur tersimpan dengan metastore BigQuery.

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, tambahkan kode contoh berikut untuk pernyataan CREATE PROCEDURE.

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigQuery metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    Ganti kode berikut:

    • PROJECT_ID: ID project Google Cloud Anda.
    • BQ_DATASET_ID: ID set data di BigQuery yang berisi prosedur.
    • PROCEDURE_NAME: nama prosedur yang Anda buat atau ganti.
    • REGION: lokasi koneksi Spark Anda.
    • LOCATION: lokasi resource BigQuery Anda.
    • SPARK_CONNECTION_ID: ID koneksi Spark Anda.
    • CATALOG_NAME: nama katalog yang Anda gunakan.
    • WAREHOUSE_DIRECTORY: URI folder Cloud Storage yang berisi data warehouse Anda.
    • NAMESPACE_NAME: namespace yang Anda gunakan.

Langkah selanjutnya