在 Dataproc 上使用 Apache Spark 搭配 HBase

目標

本教學課程將示範如何:

  1. 建立 Dataproc 叢集,並在叢集上安裝 Apache HBase 和 Apache ZooKeeper
  2. 使用在 Dataproc 叢集主要節點上執行的 HBase Shell 建立 HBase 資料表
  3. 使用 Cloud Shell 將 Java 或 PySpark Spark 工作提交至 Dataproc 服務,該服務會將資料寫入 HBase 資料表,然後從該資料表讀取資料

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

事前準備

如果您尚未建立 Google Cloud Platform 專案,請先建立專案。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  8. 建立 Dataproc 叢集

    1. Cloud Shell 工作階段終端機中執行下列指令,即可:

      • 安裝 HBaseZooKeeper 元件
      • 佈建三個工作站節點 (建議使用三到五個工作站,執行本教學課程中的程式碼)
      • 啟用元件閘道
      • 使用 2.0 版圖片
      • 使用 --properties 旗標,將 HBase 設定和 HBase 程式庫新增至 Spark 驅動程式和執行器類別路徑。
    gcloud dataproc clusters create cluster-name \
        --region=region \
        --optional-components=HBASE,ZOOKEEPER \
        --num-workers=3 \
        --enable-component-gateway \
        --image-version=2.0 \
        --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
    

    驗證連接器安裝作業

    1. 從 Google Cloud 控制台或 Cloud Shell 工作階段終端機,使用 SSH 連結至 Dataproc 叢集的主要節點

    2. 在主要節點上驗證 Apache HBase Spark 連接器的安裝作業:

      ls -l /usr/lib/spark/jars | grep hbase-spark
      
      輸出範例:
      -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
      

    3. 請將 SSH 工作階段終端機保持為開啟狀態,以便:

      1. 建立 HBase 資料表
      2. (Java 使用者):在叢集的主節點上執行指令,判斷叢集上安裝的元件版本
      3. 掃描 Hbase 資料表,然後執行程式碼

    建立 HBase 資料表

    在上一步開啟的主節點 SSH 工作階段終端機中,執行本節列出的指令。

    1. 開啟 HBase shell:

      hbase shell
      

    2. 建立 HBase「my-table」,並使用「cf」資料欄系列:

      create 'my_table','cf'
      

      1. 如要確認表格建立作業,請在 Google Cloud 控制台中,按一下Google Cloud 控制台元件閘道連結中的「HBase」HBase,開啟 Apache HBase UI。my-table 會列於「首頁」頁面的「資料表」區段中。

    查看 Spark 程式碼

    Java

    package hbase;
    
    import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SparkHBaseMain {
        public static class SampleData implements Serializable {
            private String key;
            private String name;
    
    
            public SampleData(String key, String name) {
                this.key = key;
                this.name = name;
            }
    
            public SampleData() {
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
        }
        public static void main(String[] args) {
            // Init SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .master("yarn")
                    .appName("spark-hbase-tutorial")
                    .getOrCreate();
    
            // Data Schema
            String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                    "\"rowkey\":\"key\"," +
                    "\"columns\":{" +
                    "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                    "}" +
                    "}";
    
            Map<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);
    
            Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                    new SampleData("key1", "foo"),
                    new SampleData("key2", "bar")), SampleData.class);
    
            // Write to HBase
            ds.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .mode("overwrite")
                    .save();
    
            // Read from HBase
            Dataset dataset = spark.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .load();
            dataset.show();
        }
    }
    

    Python

    from pyspark.sql import SparkSession
    
    # Initialize Spark Session
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-hbase-tutorial') \
      .getOrCreate()
    
    data_source_format = ''
    
    # Create some test data
    df = spark.createDataFrame(
        [
            ("key1", "foo"),
            ("key2", "bar"),
        ],
        ["key", "name"]
    )
    
    # Define the schema for catalog
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"my_table"},
        "rowkey":"key",
        "columns":{
            "key":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"cf", "col":"name", "type":"string"}
        }
    }""".split())
    
    # Write to HBase
    df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()
    
    # Read from HBase
    result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
    result.show()

    執行程式碼

    1. 開啟 Cloud Shell 工作階段終端機。

    2. 將 GitHub GoogleCloudDataproc/cloud-dataproc 存放區複製到 Cloud Shell 工作階段終端機:

      git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
      

    3. 切換至 cloud-dataproc/spark-hbase 目錄:

      cd cloud-dataproc/spark-hbase
      
      輸出範例:
      user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
      

    4. 提交 Dataproc 工作。

    Java

    1. pom.xml 檔案中設定元件版本。
      1. Dataproc 2.0.x 版頁面列出安裝在最新和最後四個映像檔 2.0 次要版本中的 Scala、Spark 和 HBase 元件版本。
        1. 如要找出 2.0 版映像檔叢集的次要版本,請在Google Cloud 主控台的「Clusters」(叢集) 頁面中,按一下叢集名稱開啟「Cluster details」(叢集詳細資料) 頁面,其中會列出叢集的「Image version」(映像檔版本)
      2. 或者,您也可以在叢集主要節點的 SSH 工作階段終端機中執行下列指令,判斷元件版本:
        1. 檢查 Scala 版本:
          scala -version
          
        2. 檢查 Spark 版本 (按 Ctrl + D 鍵即可退出):
          spark-shell
          
        3. 檢查 HBase 版本:
          hbase version
          
        4. 在 Maven pom.xml 中找出 Spark、Scala 和 HBase 版本依附元件:
          <properties>
            <scala.version>scala full version (for example, 2.12.14)</scala.version>
            <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
            <spark.version>spark version (for example, 3.1.2)</spark.version>
            <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
            <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
          </properties>
          
          注意:hbase-spark.version 是目前的 Spark HBase 連接器版本,請勿變更這個版本號碼。
      3. 在 Cloud Shell 編輯器中編輯 pom.xml 檔案,插入正確的 Scala、Spark 和 HBase 版本號碼。編輯完成後,按一下「Open Terminal」(開啟終端機),返回 Cloud Shell 終端機指令列。
        cloudshell edit .
        
      4. 在 Cloud Shell 中切換至 Java 8。建構程式碼時需要這個 JDK 版本 (你可以忽略任何外掛程式警告訊息):
        sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
        
      5. 驗證 Java 8 安裝狀態:
        java -version
        
        輸出範例:
        openjdk version "1.8..."
         
    2. 建構 jar 檔案:
      mvn clean package
      
      .jar 檔案會放在 /target 子目錄中 (例如 target/spark-hbase-1.0-SNAPSHOT.jar)。
    3. 提交工作。

      gcloud dataproc jobs submit spark \
          --class=hbase.SparkHBaseMain  \
          --jars=target/filename.jar \
          --region=cluster-region \
          --cluster=cluster-name
      
      • --jars:在「target/」之後和「.jar」之前插入 .jar 檔案的名稱。
      • 如果您在建立叢集時未設定 Spark 驅動程式和執行器 HBase 類別路徑,則必須在每次提交工作時設定,方法是在工作提交指令中加入下列 ‑‑properties 標記:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    4. 在 Cloud Shell 工作階段終端機輸出中,查看 HBase 資料表輸出內容:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Python

    1. 提交工作。

      gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
          --region=cluster-region \
          --cluster=cluster-name
      
      • 如果您在建立叢集時未設定 Spark 驅動程式和執行器 HBase 類別路徑,則必須在每次提交工作時設定,方法是在工作提交指令中加入下列 ‑‑properties 標記:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    2. 在 Cloud Shell 工作階段終端機輸出中,查看 HBase 資料表輸出內容:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    掃描 HBase 資料表

    如要掃描 HBase 表格的內容,請在「驗證連接器安裝作業」中開啟主要節點 SSH 工作階段終端機,然後執行下列指令:

    1. 開啟 HBase shell:
      hbase shell
      
    2. 掃描「my-table」:
      scan 'my_table'
      
      輸出範例:
      ROW               COLUMN+CELL
       key1             column=cf:name, timestamp=1647364013561, value=foo
       key2             column=cf:name, timestamp=1647364012817, value=bar
      2 row(s)
      Took 0.5009 seconds
      

      清除所用資源

      完成教學課程後,您可以清除所建立的資源,這樣資源就不會繼續使用配額,也不會產生費用。下列各節將說明如何刪除或關閉這些資源。

      刪除專案

      如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。

      如要刪除專案:

      1. In the Google Cloud console, go to the Manage resources page.

        Go to Manage resources

      2. In the project list, select the project that you want to delete, and then click Delete.
      3. In the dialog, type the project ID, and then click Shut down to delete the project.

      刪除叢集

      • 如要刪除叢集:
        gcloud dataproc clusters delete cluster-name \
            --region=${REGION}