Dataproc での HBase と Apache Spark の併用


目標

このチュートリアルでは、次の方法について説明します。

  1. Dataproc クラスタを作成し、クラスタに Apache HBase と Apache ZooKeeper をインストールする
  2. Dataproc クラスタのマスターノードで実行されている HBase シェルを使用して HBase テーブルを作成する
  3. Cloud Shell を使用して、Java または PySpark Spark ジョブを HBase テーブルへのデータの書き込みと読み取りを行う Dataproc サービスに送信する

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

準備

Google Cloud Platform プロジェクトをまだ作成していない場合は、作成します。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

Dataproc クラスタを作成する

  1. Cloud Shell セッション ターミナルで次のコマンドを実行して、以下の操作を行います。

    • HBaseZooKeeper のコンポーネントをインストールする
    • 3 つのワーカーノードをプロビジョニングする(このチュートリアルのコードを実行するには、3 ~ 5 つのワーカーをおすすめします)
    • コンポーネント ゲートウェイを有効にする
    • イメージ バージョン 2.0 を使用する
    • --properties フラグを使用して、Spark ドライバと executor のクラスパスに HBas 構成と HBas ライブラリを追加する
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

コネクタのインストールを確認する

  1. Google Cloud コンソールまたは Cloud Shell セッション ターミナルから、Dataproc クラスタ マスターノードにSSH で接続します。

  2. マスターノードへの Apache HBase Spark コネクタのインストールを確認します。

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    出力例:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. SSH セッション ターミナルを開いたままにして、以下の操作を行います。

    1. HBase テーブルを作成する
    2. (Java ユーザー): クラスタのマスターノードでコマンドを実行して、クラスタにインストールされているコンポーネントのバージョンを確認する
    3. コードを実行した後、Hbase テーブルをスキャンする

HBase テーブルを作成する

前のステップで開いたマスターノードの SSH セッション ターミナルの、このセクションに記載されているコマンドを実行します。

  1. HBase シェルを開きます。

    hbase shell
    

  2. 'cf' 列ファミリーを使用して HBase 'my-table' を作成します。

    create 'my_table','cf'
    

    1. テーブルの作成を確認するには、Google Cloud コンソールで、Google Cloud コンソール コンポーネント ゲートウェイのリンクの [HBase] をクリックし、Apache HBase UI を開きます。my-table は、[ホーム] ページの [テーブル] セクションに表示されます。

Spark コードを表示する

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

コードの実行

  1. Cloud Shell セッション ターミナルを開きます。

  2. Cloud Shell セッション ターミナルに GitHub の GoogleCloudDataproc/cloud-dataproc リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. cloud-dataproc/spark-hbase ディレクトリに移動します。

    cd cloud-dataproc/spark-hbase
    
    出力例:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Dataproc ジョブを送信する

Java

  1. コンポーネント ファイルを pom.xml ファイルで設定します。
    1. Dataproc の 2.0.x リリース バージョンのページには、イメージ 2.0 のサブマイナー バージョンの最新の 4 つと最新の Scala、Spark、HBase コンポーネントのバージョンがリストされています。
      1. 2.0 イメージ バージョンのクラスタのサブマイナー バージョンを確認するには、Google Cloud コンソールの [クラスタ] ページでクラスタ名をクリックして、[クラスタの詳細] ページを開きます。クラスタのイメージのバージョンが一覧表示されます。
    2. または、クラスタのマスターノードから SSH セッション ターミナルで次のコマンドを実行して、コンポーネントのバージョンを確認することもできます。
      1. Scala のバージョンを確認します。
        scala -version
        
      2. Spark のバージョンを確認します(control-D 終了)。
        spark-shell
        
      3. HBase のバージョンを確認します。
        hbase version
        
      4. Maven pom.xml で Spark、Scala、HBase のバージョンの依存関係を特定します。
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        注: hbase-spark.version は、現在の Spark HBase コネクタのバージョンです。このバージョン番号は変更されません。
    3. Cloud Shell エディタで pom.xml ファイルを編集し、正しい Scala、Spark、HBase のバージョン番号を挿入します。編集が完了したら、[ターミナルを開く] をクリックし、Cloud Shell ターミナルのコマンドラインに戻ります。
      cloudshell edit .
      
    4. Cloud Shell で Java 8 に切り替えます。この JDK バージョンは、コードをビルドするために必要です(プラグインの警告メッセージは無視してかまいません)。
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Java 8 のインストールを確認する
      java -version
      
      出力例:
      openjdk version "1.8..."
       
  2. jar ファイルをビルドします。
    mvn clean package
    
    .jar ファイルは /target サブディレクトリに配置されます(例: target/spark-hbase-1.0-SNAPSHOT.jar)。
  3. ジョブを送信します。

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: .jar ファイルの名前を「target/」の後、「.jar」の前に挿入します。
    • クラスタを作成したときに Spark ドライバとエグゼキュータの HBase クラスパスを設定しなかった場合、次の ‑‑properties を含めて各ジョブ送信でこれらを設定し、フラグを指定する必要があります。
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Cloud Shell セッション ターミナルの出力に HBase テーブルの出力を表示します。

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. ジョブを送信します。

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • クラスタを作成したときに Spark ドライバとエグゼキュータの HBase クラスパスを設定しなかった場合、次の ‑‑properties を含めて各ジョブ送信でこれらを設定し、フラグを指定する必要があります。
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Cloud Shell セッション ターミナルの出力に HBase テーブルの出力を表示します。

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

HBase テーブルをスキャンする

HBase テーブルの内容をスキャンするには、コネクタのインストールを確認するで開いたマスターノードの SSH セッション ターミナルで次のコマンドを実行します。

  1. HBase シェルを開きます。
    hbase shell
    
  2. scan 'my-table'
    scan 'my_table'
    
    出力例:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

クリーンアップ

チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

クラスタの削除

  • クラスタを削除するには:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}