在 Dataproc 上将 Apache Spark 与 HBase 搭配使用


目标

本教程将介绍如何执行以下操作:

  1. 创建一个 Dataproc 集群,在该集群上安装 Apache HBase 和 ApacheZooKeeper
  2. 使用在 Dataproc 集群的主节点上运行的 HBase shell 创建 HBase 表
  3. 使用 Cloud Shell 将 Java 或 PySpark Spark 作业提交到向 HBase 表写入数据,然后从中读取数据的 Dataproc 服务

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

如果您尚未创建 Google Cloud Platform 项目,请首先创建该项目。

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Dataproc and Compute Engine API。

    启用 API

  5. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  6. 确保您的 Google Cloud 项目已启用结算功能

  7. 启用 Dataproc and Compute Engine API。

    启用 API

创建 Dataproc 集群

  1. Cloud Shell 会话终端中运行以下命令,以便:

    • 安装 HBaseZooKeeper 组件
    • 预配 3 个工作器节点(建议 3 到 5 个工作器运行本教程中的代码)
    • 启用组件网关
    • 使用映像版本 2.0
    • 使用 --properties 标志将 HBase 配置和 HBase 库添加到 Spark 驱动程序和执行器类路径。
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

验证连接器安装

  1. 通过 Google Cloud 控制台或 Cloud Shell 会话终端,通过 SSH 连接到 Dataproc 集群主服务器

  2. 验证主节点上是否安装了 Apache HBase Spark 连接器

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    示例输出:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. 使 SSH 会话终端保持打开状态,以便:

    1. 创建 HBase 表
    2. (Java 用户):在集群的主节点上运行命令,以确定集群上安装的组件的版本
    3. 运行代码扫描 Hbase 表

创建 HBase 表

在您上一步打开的主节点 SSH 会话终端中运行本部分列出的命令。

  1. 打开 HBase shell:

    hbase shell
    

  2. 使用“cf”列族创建 HBase“my-table”:

    create 'my_table','cf'
    

    1. 如需确认创建表,请在 Google Cloud 控制台中,点击 Google Cloud 控制台组件网关链接中的 HBase,以打开 Apache HBase 界面。my-table 列于首页上的部分中。

查看 Spark 代码

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

运行代码

  1. 打开 Cloud Shell 会话终端。

  2. 将 GitHub GoogleCloudDataproc/cloud-dataproc 代码库克隆到 Cloud Shell 会话终端:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. 切换到 cloud-dataproc/spark-hbase 目录:

    cd cloud-dataproc/spark-hbase
    
    示例输出:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. 提交 Dataproc 作业。

Java

  1. pom.xml 文件中设置组件版本。
    1. Dataproc 2.0.x 发布版本页面列出了安装有最近和最后四个映像 2.0 次要版本的 Scala、Spark 和 HBase 组件版本。
      1. 如需查找 2.0 映像版本集群的次要版本,请点击 Google Cloud 控制台集群页面上的集群名称以打开集群详情页面,其中列出了集群映像版本
    2. 或者,您也可以在 SSH 会话终端中通过集群的主节点运行以下命令来确定组件版本:
      1. 检查 scala 版本:
        scala -version
        
      2. 检查 Spark 版本(按 Ctrl+D 退出):
        spark-shell
        
      3. 检查 HBase 版本:
        hbase version
        
      4. 确定 Maven pom.xml 中的 Spark、Scala 和 HBase 版本依赖项:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        注意:hbase-spark.version 是当前的 Spark HBase 连接器版本;请勿更改此版本号。
    3. 在 Cloud Shell 编辑器中修改 pom.xml 文件,以插入正确的 Scala、Spark 和 HBase 版本号。完成修改后,点击打开终端,以返回到 Cloud Shell 终端命令行。
      cloudshell edit .
      
    4. 在 Cloud Shell 中切换到 Java 8。构建代码需要此 JDK 版本(您可以忽略任何插件警告消息):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. 验证 Java 8 的安装情况:
      java -version
      
      示例输出:
      openjdk version "1.8..."
       
  2. 构建 jar 文件:
    mvn clean package
    
    .jar 文件位于 /target 子目录(例如 target/spark-hbase-1.0-SNAPSHOT.jar)中。
  3. 提交作业。

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars:将 .jar 文件的名称插入“target/”之后、“.jar”文件之前。
    • 如果您在创建集群时未设置 Spark 驱动程序和执行器 HBase 类路径,则必须在每次提交作业时设置它们,方法是在作业提交命令中添加以下 ‑‑properties 标志:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. 在 Cloud Shell 会话终端输出中查看 HBase 表输出:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. 提交作业。

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • 如果您在创建集群时未设置 Spark 驱动程序和执行器 HBase 类路径,则必须在每次提交作业时设置它们,方法是在作业提交命令中添加以下 ‑‑properties 标志:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. 在 Cloud Shell 会话终端输出中查看 HBase 表输出:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

扫描 HBase 表

您可以在验证连接器安装中打开的主节点 SSH 会话终端中运行以下命令,扫描 HBase 表的内容:

  1. 打开 HBase shell:
    hbase shell
    
  2. 扫描“my-table”:
    scan 'my_table'
    
    示例输出:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

清理

完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除集群

  • 如需删除您的集群,请输入以下命令:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}