Use Apache Spark with HBase on Dataproc

Objectives

This tutorial shows you how to:

  1. Create a Dataproc cluster, installing Apache HBase and Apache ZooKeeper on the cluster
  2. Create an HBase table using the HBase shell running on the master node of the Dataproc cluster
  3. Use Cloud Shell to submit a Java or PySpark Spark job to the Dataproc service that writes data to, then reads data from, the HBase table

Costs

This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

If you haven't already done so, create a Google Cloud Platform project.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

Create a Dataproc cluster

  1. Run the following command in a Cloud Shell session terminal to:

    • Install the HBase and ZooKeeper components
    • Provision three worker nodes (three to five workers are recommended to run the code in this tutorial)
    • Enable the Component Gateway
    • Use image version 2.0
    • Use the --properties flag to add the HBase config and HBase library to the Spark driver and executor classpaths.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verify connector installation

  1. From the console or a Cloud Shell session terminal, SSH into the Dataproc cluster master node.

  2. Verify the installation of the Apache HBase Spark connector on the master node:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Sample output:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Keep the SSH session terminal open to:

    1. Create an HBase table
    2. (Java users): run commands on the master node of the cluster to determine the versions of components installed on the cluster
    3. Scan your Hbase table after you run the code

Create an HBase table

Run the commands listed in this section in the master node SSH session terminal that you opened in the previous step.

  1. Open the HBase shell:

    hbase shell
    

  2. Create an HBase 'my-table' with a 'cf' column family:

    create 'my_table','cf'
    

    1. To confirm table creation, in the console, click HBase in the console Component Gateway links to open the Apache HBase UI. my-table is listed in the Tables section on the Home page.

View the Spark code

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;


        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Run the code

  1. Open a Cloud Shell session terminal.

  2. Clone the GitHub GoogleCloudDataproc/cloud-dataproc repository into your Cloud Shell session terminal:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Change to the cloud-dataproc/spark-hbase directory:

    cd cloud-dataproc/spark-hbase
    
    Sample output:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Submit the Dataproc job.

Java

  1. Set component versions in pom.xml file.
    1. The Dataproc 2.0.x release versions page lists the Scala, Spark, and HBase component versions installed with the most recent and last four image 2.0 subminor versions.
      1. To find the subminor version of your 2.0 image version cluster, click the cluster name on the Clusters page in the Google Cloud console to open the Cluster details page, where the cluster Image version is listed.
    2. Alternatively, you can run the following commands in an SSH session terminal from the master node of your cluster to determine component versions:
      1. Check scala version:
        scala -version
        
      2. Check Spark version (control-D to exit):
        spark-shell
        
      3. Check HBase version:
        hbase version
        
      4. Identify the Spark, Scala, and HBase version dependencies in the Maven pom.xml:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Note: The hbase-spark.version is the current Spark HBase connector version; leave this version number unchanged.
    3. Edit the pom.xml file in the Cloud Shell editor to insert the the correct Scala, Spark, and HBase version numbers. Click Open Terminal when you finish editing to return to the Cloud Shell terminal command line.
      cloudshell edit .
      
    4. Switch to Java 8 in Cloud Shell. This JDK version is needed to build the code (you can ignore any plugin warning messages):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verify Java 8 installation:
      java -version
      
      Sample output:
      openjdk version "1.8..."
       
  2. Build the jar file:
    mvn clean package
    
    The .jar file is placed in the /target subdirectory (for example, target/spark-hbase-1.0-SNAPSHOT.jar.
  3. Submit the job.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: Insert the name of your .jar file after "target/" and before ".jar".
    • If you did not set the Spark driver and executor HBase classpaths when you created your cluster, you must set them with each job submission by including the following ‑‑properties flag in you job submit command:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. View HBase table output in the Cloud Shell session terminal output:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Submit the job.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • If you did not set the Spark driver and executor HBase classpaths when you created your cluster, you must set them with each job submission by including the following ‑‑properties flag in you job submit command:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. View HBase table output in the Cloud Shell session terminal output:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Scan the HBase table

You can scan the content of your HBase table by running the following commands in the master node SSH session terminal that you opened in Verify connector installation:

  1. Open the HBase shell:
    hbase shell
    
  2. Scan 'my-table':
    scan 'my_table'
    
    Sample output:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Clean up

After you finish the tutorial, you can clean up the resources that you created so that they stop using quota and incurring charges. The following sections describe how to delete or turn off these resources.

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the cluster

  • To delete your cluster:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}