在 BigQuery Studio 筆記本中執行 PySpark 程式碼

本文說明如何在 BigQuery Python 筆記本中執行 PySpark 程式碼。

事前準備

請建立 Google Cloud 專案和 Cloud Storage bucket

  1. 設定專案

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    6. 如果沒有可用的 Cloud Storage bucket,請在專案中建立一個

    7. 設定筆記本

      1. 筆記本憑證:根據預設,筆記本工作階段會使用使用者憑證。如要為工作階段指定服務帳戶憑證,該帳戶必須具有 Dataproc 工作者 (roles/dataproc.worker 角色)。詳情請參閱「Dataproc 無伺服器服務帳戶」。
      2. 筆記本執行階段:除非選取其他執行階段,否則筆記本會使用預設的 Vertex 執行階段。如要定義自己的執行階段,請在 Google Cloud 控制台的「Runtimes」(執行階段) 頁面中建立執行階段。
    8. 定價

      如需價格資訊,請參閱 BigQuery Notebook 執行階段價格

      開啟 BigQuery Studio Python 筆記本

      1. 前往 Google Cloud 控制台的「BigQuery」頁面。

        前往 BigQuery

      2. 在詳細資料窗格的分頁列中,按一下「+」符號旁的箭頭 ,然後按一下「記事本」

      在 BigQuery Studio 筆記本中建立 Spark 工作階段

      您可以使用 BigQuery Studio Python 筆記本建立 Spark Connect 互動式工作階段。每個 BigQuery Studio 筆記本只能有一個相關聯的有效 Dataproc Serverless 工作階段。

      您可以在 BigQuery Studio Python 筆記本中,透過下列方式建立 Spark 工作階段:

      • 在筆記本中設定及建立單一工作階段。
      • Dataproc Serverless for Spark 互動式工作階段範本中設定 Spark 工作階段,然後使用範本在筆記本中設定及建立工作階段。BigQuery 提供 Query using Spark 功能,可協助您開始編寫範本化工作階段的程式碼,如「範本化 Spark 工作階段」分頁標籤所述。

      單次

      如要在新筆記本中建立 Spark 工作階段,請按照下列步驟操作:

      1. 在編輯器窗格的分頁列中,按一下「+」符號旁的向下箭頭 ,然後點選「Notebook」(筆記本)

        螢幕截圖:顯示 BigQuery 介面,以及用於建立新筆記本的「+」按鈕。
      2. 在筆記本儲存格中複製並執行下列程式碼,即可設定及建立基本 Spark 工作階段。

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.connect.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      更改下列內容:

      • APP_NAME:工作階段的選用名稱。
      • 選用工作階段設定:您可以新增 Dataproc API Session 設定,自訂工作階段。以下舉幾個例子說明:
        • RuntimeConfig
          程式碼說明,顯示 session.runtime.config 選項。
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig
          程式碼說明,顯示 session-environment-config-execution-config 選項。
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      範本化的 Spark 工作階段

      您可以在筆記本儲存格中輸入並執行程式碼,根據現有的 Dataproc Serverless 工作階段範本建立 Spark 工作階段。您在筆記本程式碼中提供的任何 session 設定,都會覆寫在工作階段範本中設定的相同設定。

      如要快速上手,請使用Query using Spark範本,預先填入 Spark 會話範本程式碼至筆記本:

      1. 在編輯器窗格的分頁列中,按一下「+」符號旁的向下箭頭 ,然後點選「Notebook」(筆記本)
        螢幕截圖:顯示 BigQuery 介面,以及用於建立新筆記本的「+」按鈕。
      2. 在「從範本開始」下方,按一下「使用 Spark 查詢」,然後點選「使用範本」,將程式碼插入筆記本。
        BigQuery UI 選項,可從範本開始
      3. 請按照「附註」一節的說明指定變數。
      4. 您可以刪除插入筆記本中的任何額外範例程式碼儲存格。
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      取代下列項目:

      在 BigQuery Studio 筆記本中編寫及執行 PySpark 程式碼

      在筆記本中建立 Spark 工作階段後,即可使用該工作階段在筆記本中執行 Spark 筆記本程式碼。

      支援 Spark Connect PySpark API:Spark Connect 筆記本工作階段支援大多數 PySpark API,包括 DataFrameFunctionsColumn,但不支援 SparkContextRDD 等 PySpark API。詳情請參閱「Spark 3.5 支援的項目」。

      Dataproc 專屬 API:Dataproc 擴充了 addArtifacts 方法,可簡化將 PyPI 套件動態新增至 Spark 工作階段的程序。您可以採用 version-scheme 格式指定清單 (類似 pip install)。這會指示 Spark Connect 伺服器在所有叢集節點上安裝套件及其依附元件,讓工作站可將這些套件用於 UDF。

      以下範例會在叢集上安裝指定的 textdistance 版本和最新相容的 random2 程式庫,讓使用 textdistancerandom2 的 UDF 在工作節點上執行。

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      筆記本程式碼說明:在 BigQuery Studio 筆記本中,將指標懸停在類別或方法名稱上時,系統會提供程式碼說明;輸入程式碼時,系統會提供程式碼自動完成說明。

      在以下範例中,輸入 DataprocSparkSession。並將指標懸停在這個類別名稱上,即可顯示程式碼完成和文件說明。

      程式碼文件和程式碼完成提示範例。

      BigQuery Studio 筆記本 PySpark 範例

      本節提供 BigQuery Studio Python 筆記本範例,其中包含 PySpark 程式碼,可執行下列工作:

      • 對公開的莎士比亞資料集執行字詞計算。
      • 建立 Iceberg 資料表,並將中繼資料儲存在 BigLake Metastore 中。

      Wordcount

      下列 Pyspark 範例會建立 Spark 工作階段,然後計算公開 bigquery-public-data.samples.shakespeare 資料集中出現的字詞。

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      更改下列內容:

      • APP_NAME:工作階段的選用名稱。

      輸出內容:

      儲存格輸出內容會列出字數統計輸出內容的範例。如要在 Google Cloud 控制台中查看工作階段詳細資料,請按一下「互動式工作階段詳細資料檢視畫面」連結。如要監控 Spark 工作階段,請在工作階段詳細資料頁面中,按一下「查看 Spark UI」

      在主控台的工作階段詳細資料頁面中,查看「Spark UI」按鈕
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Iceberg 資料表

      執行 PySpark 程式碼,使用 BigLake metastore 中繼資料建立 Iceberg 資料表

      下列範例程式碼會建立 sample_iceberg_table,其中包含儲存在 BigLake Metastore 中的資料表中繼資料,然後查詢該資料表。

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigLake Metastore with the Iceberg environment.
      project = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      注意:

      • PROJECT:您的專案 ID,列於 Google Cloud 控制台資訊主頁的「專案資訊」部分。
      • REGIONSUBNET_NAME:指定 Compute Engine 地區,以及工作階段地區中的子網路名稱。Dataproc Serverless 會在指定的子網路上啟用私人 Google 存取權 (PGA)
      • LOCATION:預設值為 BigQuery_metastore_config.locationspark.sql.catalog.{catalog}.gcp_location,但您可以選擇任何US支援的 BigQuery 位置
      • BUCKETWAREHOUSE_DIRECTORY:用於 Iceberg 倉庫目錄的 Cloud Storage 值區和資料夾。
      • CATALOG_NAMENAMESPACE:Iceberg 目錄名稱和命名空間會合併,用於識別 Iceberg 資料表 (catalog.namespace.table_name)。
      • APP_NAME:工作階段的選用名稱。

      儲存格輸出內容會列出新增資料欄的 sample_iceberg_table,並在 Google Cloud 控制台中顯示「互動式工作階段詳細資料」頁面的連結。您可以在工作階段詳細資料頁面按一下「查看 Spark UI」,監控 Spark 工作階段。

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      在 BigQuery 中查看資料表詳細資料

      請按照下列步驟,在 BigQuery 中查看 Iceberg 資料表詳細資料:

      1. 前往 Google Cloud 控制台的「BigQuery」頁面。

        前往 BigQuery

      2. 在專案資源窗格中,按一下專案,然後按一下命名空間,列出 sample_iceberg_table 資料表。按一下「詳細資料」資料表,即可查看「開啟目錄資料表設定」資訊。

        輸入和輸出格式是 Iceberg 使用的標準 Hadoop InputFormatOutputFormat 類別格式。

        BigQuery UI 中列出的 Iceberg 資料表的中繼資料

      其他範例

      從 Pandas DataFrame (df) 建立 Spark DataFrame (sdf)。

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      在 Spark DataFrames 上執行匯總作業。

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      使用 Spark-BigQuery 連接器從 BigQuery 讀取資料。

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      使用 Gemini Code Assist 撰寫 Spark 程式碼

      您可以要求 Gemini Code Assist 在筆記本中生成 PySpark 程式碼。Gemini Code Assist 會擷取並使用相關的 BigQuery 和 Dataproc Metastore 資料表及其結構定義,產生程式碼回應。

      如要在筆記本中生成 Gemini Code Assist 程式碼,請按照下列步驟操作:

      1. 按一下工具列中的「+ 程式碼」,插入新的程式碼儲存格。 新的程式碼儲存格會顯示 Start coding or generate with AI。 點選「生成」

      2. 在「生成」編輯器中輸入自然語言提示,然後按一下 enter請務必在提示中加入關鍵字 sparkpyspark

        提示範例:

        create a spark dataframe from order_items and filter to orders created in 2024
        

        輸出內容範例:

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      使用 Gemini Code Assist 生成程式碼的提示

      結束問答時間

      如要在 BigQuery Studio 筆記本中停止 Spark Connect 工作階段,可以採取下列任一動作:

      • 在筆記本儲存格中執行 spark.stop()
      • 終止筆記本中的執行階段:
        1. 按一下執行階段選取器,然後按一下「管理工作階段」
          管理工作階段選取畫面
        2. 在「Active sessions」(有效工作階段) 對話方塊中,按一下終止圖示,然後點選「Terminate」(終止)
          在「運作中的工作階段」對話方塊中終止選取的工作階段

      協調 BigQuery Studio 筆記本程式碼

      您可以透過下列方式協調 BigQuery Studio 筆記本程式碼:

      從 Google Cloud 控制台排定筆記本程式碼

      您可以透過下列方式排定筆記本程式碼:

      以 Dataproc Serverless 批次工作負載的形式執行筆記本程式碼

      請完成下列步驟,以 Dataproc Serverless 批次工作負載的形式,執行 BigQuery Studio 筆記本程式碼。

      1. 將筆記本程式碼下載到本機終端機或 Cloud Shell 的檔案中。

        1. 在 Google Cloud 控制台的「BigQuery Studio」頁面上,開啟「探索器」面板中的筆記本。

        2. 依序選取「檔案」選單中的「下載」,然後選擇 Download .py,即可下載筆記本程式碼。

          在「探索工具」頁面中,依序點選「檔案」>「下載」選單。
      2. 生成 requirements.txt

        1. 在儲存 .py 檔案的目錄中安裝 pipreqs
          pip install pipreqs
          
        2. 執行 pipreqs 即可生成 requirements.txt

          pipreqs filename.py
          

        3. 使用 Google Cloud CLI 將本機 requirements.txt 檔案複製到 Cloud Storage 中的 bucket。

          gcloud storage cp requirements.txt gs://BUCKET/
          
      3. 編輯下載的 .py 檔案,更新 Spark 工作階段程式碼。

        1. 移除或註解排除任何 Shell 指令碼指令。

        2. 移除設定 Spark 工作階段的程式碼,然後將設定參數指定為批次工作負載提交參數。(請參閱「提交 Spark 批次工作負載」)。

          範例:

          • 從程式碼中移除下列工作階段子網路設定行:

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • 執行批次工作負載時,請使用 --subnet 旗標指定子網路。

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. 使用簡單的工作階段建立程式碼片段。

          • 簡化前下載的筆記本程式碼範例。

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • 簡化後的批次工作負載程式碼。

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. 執行批次工作負載。

        1. 如需操作說明,請參閱「提交 Spark 批次工作負載」。

          • 請務必加入 --deps-bucket 標記,指向包含 Yourrequirements.txt 檔案的 Cloud Storage bucket。

            範例:

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          注意:

          • FILENAME:下載並編輯的筆記本程式碼檔案名稱。
          • REGION:叢集所在的 Compute Engine 地區
          • BUCKET:包含 requirements.txt 檔案的 Cloud Storage 值區名稱。
          • --version:選取 Spark 執行階段 2.3 版,執行批次工作負載。
      5. 提交程式碼。

        1. 測試批次工作負載程式碼後,您可以使用 git 用戶端 (例如 GitHub、GitLab 或 Bitbucket) 將 .ipynb.py 檔案提交至存放區,做為 CI/CD pipeline 的一部分。
      6. 使用 Cloud Composer 排定批次工作負載。

        1. 如需相關操作說明,請參閱「使用 Cloud Composer 執行 Dataproc Serverless 工作負載」。

      排解筆記本錯誤

      如果含有 Spark 程式碼的儲存格發生失敗,您可以按一下儲存格輸出內容中的「Interactive Session Detail View」(互動式工作階段詳細資料檢視畫面) 連結,排解錯誤 (請參閱「Wordcount and Iceberg table examples」(字數統計和 Iceberg 表格範例))。

      已知問題和解決方案

      錯誤:使用 Python 版本 3.10 建立的 Notebook 執行階段嘗試連線至 Spark 工作階段時,可能會導致 PYTHON_VERSION_MISMATCH 錯誤。

      解決方法:使用 Python 版本 3.11 重新建立執行階段。

      後續步驟