在 Dataproc 上将 Apache Spark 与 HBase 搭配使用


目标

本教程将介绍如何执行以下操作:

  1. 创建 Dataproc 集群,并在集群上安装 Apache HBase 和 Apache ZooKeeper
  2. 使用在 Dataproc 集群的主节点上运行的 HBase shell 创建 HBase 表
  3. 使用 Cloud Shell 将 Java 或 PySpark Spark 作业提交到 Dataproc 服务,以将数据写入 HBase 表,然后从 HBase 表读取表。

费用

在本文档中,您将使用 Google Cloud的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。

新 Google Cloud 用户可能有资格申请免费试用

准备工作

如果您尚未创建 Google Cloud Platform 项目,请首先创建该项目。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

创建 Dataproc 集群

  1. Cloud Shell 会话终端中运行以下命令,以执行以下操作:

    • 安装 HBaseZooKeeper 组件
    • 预配三个工作器节点(建议使用三到五个工作器来运行本教程中的代码)
    • 启用组件网关
    • 使用映像版本 2.0
    • 使用 --properties 标志将 HBase 配置和 HBase 库添加到 Spark 驱动程序和执行器类路径。
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

验证连接器安装

  1. 在 Google Cloud 控制台或 Cloud Shell 会话终端 中,通过 SSH 连接到 Dataproc 集群主节点

  2. 验证主节点上是否已安装 Apache HBase Spark 连接器

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    示例输出:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. 保持 SSH 会话终端打开状态,以执行以下操作:

    1. 创建 HBase 表
    2. (Java 用户):对集群的主节点运行命令,以确定集群上安装的组件的版本
    3. 运行代码扫描 Hbase 表

创建 HBase 表

在上一步中打开的主节点 SSH 会话终端中运行本部分中列出的命令。

  1. 打开 HBase shell:

    hbase shell
    

  2. 创建包含“cf”列族的 HBase“my-table”:

    create 'my_table','cf'
    

    1. 如需确认创建表,请在 Google Cloud 控制台中点击 Google Cloud 控制台组件网关链接中的 HBase,以打开 Apache HBase 界面。my-table 列在首页页面的部分中。

查看 Spark 代码

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;


        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

运行代码

  1. 打开 Cloud Shell 会话终端。

  2. 将 GitHub GoogleCloudDataproc/cloud-dataproc 代码库克隆到 Cloud Shell 会话终端中:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. 切换到 cloud-dataproc/spark-hbase 目录:

    cd cloud-dataproc/spark-hbase
    
    示例输出:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. 提交 Dataproc 作业。

Java

  1. pom.xml 文件中设置组件版本。
    1. Dataproc 2.0.x 发布版本页面列出了与最新和过去四个 2.0 次要版本映像一起安装的 Scala、Spark 和 HBase 组件版本。
      1. 如需查找 2.0 映像版本集群的次要版本,请点击 Google Cloud 控制台中集群页面上的集群名称,以打开集群详情页面,其中列出了集群映像版本
    2. 或者,您也可以在集群的主节点的 SSH 会话终端中运行以下命令,以确定组件版本:
      1. 检查 Scala 版本:
        scala -version
        
      2. 检查 Spark 版本(按 Control-D 退出):
        spark-shell
        
      3. 检查 HBase 版本:
        hbase version
        
      4. 在 Maven pom.xml 中识别 Spark、Scala 和 HBase 版本依赖项:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        注意:hbase-spark.version 是当前的 Spark HBase 连接器版本;请勿更改此版本号。
    3. 在 Cloud Shell 编辑器中修改 pom.xml 文件,以插入正确的 Scala、Spark 和 HBase 版本号。完成修改后,点击打开终端以返回 Cloud Shell 终端命令行。
      cloudshell edit .
      
    4. 在 Cloud Shell 中切换到 Java 8。您需要使用此 JDK 版本来构建代码(可以忽略任何插件警告消息):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. 验证 Java 8 安装:
      java -version
      
      示例输出:
      openjdk version "1.8..."
       
  2. 构建 jar 文件:
    mvn clean package
    
    .jar 文件位于 /target 子目录中(例如 target/spark-hbase-1.0-SNAPSHOT.jar)。
  3. 提交作业。

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars:将 .jar 文件的名称插入“target/”之后、“jar”之前。
    • 如果您在创建集群时未设置 Spark 驱动程序和执行器 HBase 类路径,则必须在提交每个作业时通过在作业提交命令中添加以下 ‑‑properties 标志来进行设置:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. 在 Cloud Shell 会话终端输出中查看 HBase 表输出:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. 提交作业。

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • 如果您在创建集群时未设置 Spark 驱动程序和执行器 HBase 类路径,则必须在提交每个作业时通过在作业提交命令中添加以下 ‑‑properties 标志来进行设置:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. 在 Cloud Shell 会话终端输出中查看 HBase 表输出:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

扫描 HBase 表

您可以在验证连接器安装中打开的主节点 SSH 会话终端中运行以下命令,以扫描 HBase 表的内容:

  1. 打开 HBase shell:
    hbase shell
    
  2. 扫描“my-table”:
    scan 'my_table'
    
    示例输出:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

清理

完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除集群

  • 如需删除您的集群,请输入以下命令:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}