在 Dataproc 上将 Apache Spark 与 HBase 搭配使用

目标

本教程将介绍如何执行以下操作:

  1. 创建 Dataproc 集群,并在集群上安装 Apache HBase 和 Apache ZooKeeper
  2. 使用在 Dataproc 集群的主节点上运行的 HBase shell 创建 HBase 表
  3. 使用 Cloud Shell 将 Java 或 PySpark Spark 作业提交到 Dataproc 服务,该服务会将数据写入 HBase 表并从中读取数据

费用

本教程使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

如果您尚未创建 Google Cloud Platform 项目,请首先创建该项目。

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  4. 启用 Dataproc and Compute Engine API。

    启用 API

  5. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  6. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  7. 启用 Dataproc and Compute Engine API。

    启用 API

创建 Dataproc 集群

  1. Cloud Shell 会话终端中运行以下命令,以执行以下操作:

    • 安装 HBaseZooKeeper 组件
    • 预配三个工作器节点(建议运行三到五个工作器)运行本教程中的代码
    • 启用组件网关
    • 使用映像版本 2.0
    • 使用 --properties 标志将 HBase 配置和 HBase 库添加到 Spark 驱动程序和执行程序类路径。
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

验证连接器安装

  1. 从 Cloud Console 或 Cloud Shell 会话终端通过 SSH 连接到 Dataproc 集群主节点

  2. 验证主节点上是否安装了 Apache HBase Spark 连接器

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    示例输出:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. 将 SSH 会话终端保持打开状态,以便:

    1. 创建 HBase 表
    2. (Java 用户)在集群的主节点上运行命令,以确定集群上安装的组件的版本
    3. 运行代码后,扫描您的 HBase 表

创建 HBase 表

在您在上一步中打开的主节点 SSH 会话终端中运行本部分列出的命令。

  1. 打开 HBase shell:

    hbase shell
    

  2. 创建一个具有“列族”的 HBase 表:

    create 'my_table','cf'
    

    1. 如需确认创建表,请在 Cloud Console 中点击 Cloud Console 组件网关链接中的 HBase 打开 Apache HBase 界面。my-table 列在首页部分。

查看 Spark 代码

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

运行代码

  1. 打开 Cloud Shell 会话终端。

  2. 将 GitHub GoogleCloudDataproc/cloud-dataproc 代码库克隆到您的 Cloud Shell 会话终端中:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. 切换到 cloud-dataproc/spark-hbase 目录:

    cd cloud-dataproc/spark-hbase
    
    示例输出:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. 提交 Dataproc 作业。

Java

  1. pom.xml 文件中设置组件版本。
    1. Dataproc 2.0.x 发布版本页面列出了安装有最新和最后一个四个映像 2.0 次要版本的 Scala、Spark 和 HBase 组件版本。
      1. 如需查找 2.0 映像版本的集群的次要版本,请点击 Google Cloud Console 中集群页面上的集群名称,打开集群详情页面,其中列出了集群映像版本
    2. 或者,您可以从集群的主节点在 SSH 会话终端中运行以下命令,确定组件版本:
      1. 检查 scala 版本:
        scala -version
        
      2. 检查 Spark 版本(Ctrl+D 退出):
        spark-shell
        
      3. 检查 HBase 版本:
        hbase version
        
      4. 识别 Maven pom.xml 中的 Spark、Scala 和 HBase 版本依赖项:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        注意:hbase-spark.version 是当前的 Spark HBase 连接器版本;此版本版本号保持不变。
    3. 在 Cloud Shell 编辑器中修改 pom.xml 文件,以插入正确的 Scala、Spark 和 HBase 版本号。修改完毕后,点击打开终端以返回 Cloud Shell 终端命令行。
      cloudshell edit .
      
    4. 在 Cloud Shell 中切换到 Java 8。构建代码需要此 JDK 版本(您可以忽略任何插件警告消息):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. 验证 Java 8 的安装情况:
      java -version
      
      示例输出:
      openjdk version "1.8..."
       
  2. 构建 jar 文件:
    mvn clean package
    
    .jar 文件放在 /target 子目录中(例如 target/spark-hbase-1.0-SNAPSHOT.jar)。
  3. 提交作业。

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars:将 .jar 文件的名称插入到“target/”和“.jar”之前。
    • 如果您在创建集群时未设置 Spark 驱动程序和执行程序 HBase 类路径,则必须在每次作业提交时设置这些变量,方法是在作业提交命令中添加以下 ‑‑properties 标志:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. 在 Cloud Shell 会话终端输出中查看 HBase 表输出:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. 提交作业。

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • 如果您在创建集群时未设置 Spark 驱动程序和执行程序 HBase 类路径,则必须在每次作业提交时设置这些变量,方法是在作业提交命令中添加以下 ‑‑properties 标志:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. 在 Cloud Shell 会话终端输出中查看 HBase 表输出:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

扫描 HBase 表

通过在验证连接器安装中打开的主节点 SSH 会话终端中运行以下命令,您可以扫描 HBase 表的内容:

  1. 打开 HBase shell:
    hbase shell
    
  2. 扫描“我的表格”:
    scan 'my_table'
    
    示例输出:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

清理

完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除集群

  • 如需删除您的集群,请输入以下命令:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}