在 Dataproc 上将 Apache Spark 与 HBase 搭配使用


目标

本教程将介绍如何执行以下操作:

  1. 创建一个 Dataproc 集群,在该集群上安装 Apache HBase 和 ApacheZooKeeper
  2. 使用在 Dataproc 集群的主节点上运行的 HBase shell 创建 HBase 表
  3. 使用 Cloud Shell 将 Java 或 PySpark Spark 作业提交到向 HBase 表写入数据,然后从中读取数据的 Dataproc 服务

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

如果您尚未创建 Google Cloud Platform 项目,请首先创建该项目。

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Dataproc and Compute Engine API。

    启用 API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. 确保您的 Google Cloud 项目已启用结算功能

  7. 启用 Dataproc and Compute Engine API。

    启用 API

创建 Dataproc 集群

  1. Cloud Shell 会话终端中运行以下命令,以便:

    • 安装 HBaseZooKeeper 组件
    • 预配 3 个工作器节点(建议 3 到 5 个工作器运行本教程中的代码)
    • 启用组件网关
    • 使用映像版本 2.0
    • 使用 --properties 标志将 HBase 配置和 HBase 库添加到 Spark 驱动程序和执行器类路径。
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

验证连接器安装

  1. 通过 Google Cloud 控制台或 Cloud Shell 会话终端,通过 SSH 连接到 Dataproc 集群主服务器

  2. 验证主节点上是否安装了 Apache HBase Spark 连接器

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    示例输出:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. 使 SSH 会话终端保持打开状态,以便:

    1. 创建 HBase 表
    2. (Java 用户):在集群的主节点上运行命令,以确定集群上安装的组件的版本
    3. 运行代码扫描 Hbase 表

创建 HBase 表

在您上一步打开的主节点 SSH 会话终端中运行本部分列出的命令。

  1. 打开 HBase shell:

    hbase shell
    

  2. 使用“cf”列族创建 HBase“my-table”:

    create 'my_table','cf'
    

    1. 如需确认创建表,请在 Google Cloud 控制台中,点击 Google Cloud 控制台组件网关链接中的 HBase,以打开 Apache HBase 界面。my-table 列于首页上的部分中。

查看 Spark 代码

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

运行代码

  1. 打开 Cloud Shell 会话终端。

  2. 将 GitHub GoogleCloudDataproc/cloud-dataproc 代码库克隆到 Cloud Shell 会话终端:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. 切换到 cloud-dataproc/spark-hbase 目录:

    cd cloud-dataproc/spark-hbase
    
    示例输出:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. 提交 Dataproc 作业。

Java

  1. pom.xml 文件中设置组件版本。
    1. Dataproc 2.0.x 发布版本页面列出了安装有最近和最后四个映像 2.0 次要版本的 Scala、Spark 和 HBase 组件版本。
      1. 如需查找 2.0 映像版本集群的次要版本,请点击 Google Cloud 控制台集群页面上的集群名称以打开集群详情页面,其中列出了集群映像版本
    2. 或者,您也可以在 SSH 会话终端中通过集群的主节点运行以下命令来确定组件版本:
      1. 检查 scala 版本:
        scala -version
        
      2. 检查 Spark 版本(按 Ctrl+D 退出):
        spark-shell
        
      3. 检查 HBase 版本:
        hbase version
        
      4. 确定 Maven pom.xml 中的 Spark、Scala 和 HBase 版本依赖项:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        注意:hbase-spark.version 是当前的 Spark HBase 连接器版本;请勿更改此版本号。
    3. 在 Cloud Shell 编辑器中修改 pom.xml 文件,以插入正确的 Scala、Spark 和 HBase 版本号。完成修改后,点击打开终端,以返回到 Cloud Shell 终端命令行。
      cloudshell edit .
      
    4. 在 Cloud Shell 中切换到 Java 8。构建代码需要此 JDK 版本(您可以忽略任何插件警告消息):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. 验证 Java 8 的安装情况:
      java -version
      
      示例输出:
      openjdk version "1.8..."
       
  2. 构建 jar 文件:
    mvn clean package
    
    .jar 文件位于 /target 子目录(例如 target/spark-hbase-1.0-SNAPSHOT.jar)中。
  3. 提交作业。

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars:将 .jar 文件的名称插入“target/”之后、“.jar”文件之前。
    • 如果您在创建集群时未设置 Spark 驱动程序和执行器 HBase 类路径,则必须在每次提交作业时设置它们,方法是在作业提交命令中添加以下 ‑‑properties 标志:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. 在 Cloud Shell 会话终端输出中查看 HBase 表输出:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. 提交作业。

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • 如果您在创建集群时未设置 Spark 驱动程序和执行器 HBase 类路径,则必须在每次提交作业时设置它们,方法是在作业提交命令中添加以下 ‑‑properties 标志:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. 在 Cloud Shell 会话终端输出中查看 HBase 表输出:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

扫描 HBase 表

您可以在验证连接器安装中打开的主节点 SSH 会话终端中运行以下命令,扫描 HBase 表的内容:

  1. 打开 HBase shell:
    hbase shell
    
  2. 扫描“my-table”:
    scan 'my_table'
    
    示例输出:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

清理

完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除集群

  • 如需删除您的集群,请输入以下命令:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}