Dataproc での HBase と Apache Spark の併用

目標

このチュートリアルでは、次の方法について説明します。

  1. Dataproc クラスタを作成し、クラスタに Apache HBase と Apache ZooKeeper をインストールする
  2. Dataproc クラスタのマスターノードで実行されている HBase シェルを使用して HBase テーブルを作成する。
  3. Cloud Shell を使用して、HBase テーブルへのデータの書き込みと読み取りを行う Java または PySpark Spark ジョブを Dataproc サービスに送信します。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

準備

Google Cloud Platform プロジェクトをまだ作成していない場合は、作成します。

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. Dataproc and Compute Engine API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  7. Dataproc and Compute Engine API を有効にします。

    API を有効にする

Dataproc クラスタを作成する

  1. Cloud Shell セッション ターミナルで次のコマンドを実行して、次の操作を行います。

    • HBase コンポーネントと ZooKeeper コンポーネントをインストールする
    • 3 つのワーカーノードをプロビジョニングします(このチュートリアルのコードを実行するには、3 ~ 5 つのワーカーをおすすめします)。
    • コンポーネント ゲートウェイを有効にします。
    • イメージ バージョン 2.0 を使用する
    • Use the --properties flag to add the HBase config and HBase library to the Spark driver and executor classpaths.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

コネクタのインストールを確認する

  1. Cloud Console または Cloud Shell セッション ターミナルから、SSH で Dataproc クラスタ マスターノードに接続します。

  2. マスターノードに Apache HBase Spark コネクタがインストールされていることを確認します。

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    出力例:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. SSH セッション ターミナルを開いたままにしておきます。

    1. HBase テーブルの作成
    2. (Java ユーザー): クラスタのマスターノードでコマンドを実行して、クラスタにインストールされているコンポーネントのバージョンを確認します。
    3. コードを実行した後、Hbase テーブルをスキャンする

HBase テーブルの作成

前の手順で開いたマスターノード SSH セッション ターミナルで、このセクションに記載されているコマンドを実行します。

  1. HBase シェルを開きます。

    hbase shell
    

  2. 「cf」列ファミリーを使用して HBase「my-table」を作成します。

    create 'my_table','cf'
    

    1. テーブルの作成を確認するには、Cloud Console で Cloud Console コンポーネント ゲートウェイのリンクの [HBase] をクリックし、Apache HBase UI を開きます。my-table は、[ホーム] ページの [テーブル] セクションに表示されます。

Spark コードを表示する

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

コードの実行

  1. Cloud Shell セッション ターミナルを開きます。

  2. Cloud Shell セッション ターミナルに GitHub GoogleCloudDataproc/cloud-dataproc リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. cloud-dataproc/spark-hbase ディレクトリに移動します。

    cd cloud-dataproc/spark-hbase
    
    出力例:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Dataproc ジョブを送信する

Java

  1. pom.xml ファイルでコンポーネントのバージョンを設定します。
    1. Dataproc の 2.0.x リリース バージョンのページには、イメージ 2.0 のサブマイナー バージョンの最新の 4 つと最新の Scala、Spark、HBase コンポーネントのバージョンがリストされています。
      1. 2.0 イメージ バージョンのクラスタのサブマイナー バージョンを確認するには、Google Cloud Console の [クラスタ] ページでクラスタ名をクリックして開きます。 [クラスタの詳細] ページ。クラスタのイメージのバージョンが一覧表示されます。
    2. または、クラスタのマスターノードから SSH セッション ターミナルで次のコマンドを実行して、コンポーネントのバージョンを確認することもできます。
      1. Scala のバージョンを確認します。
        scala -version
        
      2. Spark のバージョンを確認します(終了するには Ctrl+D)。
        spark-shell
        
      3. HBase のバージョンを確認します。
        hbase version
        
      4. Maven pom.xml で Spark、Scala、HBase のバージョンの依存関係を特定します。
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        注: hbase-spark.version は現在の Spark HBase コネクタのバージョンです。このバージョン番号は変更しないでください。
    3. Cloud Shell エディタで pom.xml ファイルを編集して、正しい Scala、Spark、HBase のバージョン番号を挿入します。編集が完了したら [ターミナルを開く] をクリックして、Cloud Shell ターミナルのコマンドラインに戻ります。
      cloudshell edit .
      
    4. Cloud Shell で Java 8 に切り替えます。この JDK バージョンは、コードをビルドするために必要です(プラグインの警告メッセージは無視してかまいません)。
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Java 8 のインストールを確認する
      java -version
      
      出力例:
      openjdk version "1.8..."
       
  2. jar ファイルをビルドします。
    mvn clean package
    
    .jar ファイルは、/target サブディレクトリに配置されます(たとえば、target/spark-hbase-1.0-SNAPSHOT.jar)。
  3. ジョブを送信します。

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: .jar ファイルの名前を「target/」の後と「.jar」の前に挿入します。
    • クラスタを作成したときに Spark ドライバとエグゼキュータの HBase クラスパスを設定しなかった場合、次の ‑‑properties を含めて各ジョブ送信でこれらを設定し、フラグを指定する必要があります。
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Cloud Shell セッションターミナルの出力に HBase テーブルの出力を表示します。

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. ジョブを送信します。

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • クラスタを作成したときに Spark ドライバとエグゼキュータの HBase クラスパスを設定しなかった場合、次の ‑‑properties を含めて各ジョブ送信でこれらを設定し、フラグを指定する必要があります。
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Cloud Shell セッションターミナルの出力に HBase テーブルの出力を表示します。

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

HBase テーブルをスキャンする

HBase テーブルの内容をスキャンするには、コネクタのインストールを確認するで開いたマスターノード SSH セッション ターミナルで次のコマンドを実行します。

  1. HBase シェルを開きます。
    hbase shell
    
  2. scan 'my-table'
    scan 'my_table'
    
    出力例:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

クリーンアップ

チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

クラスタの削除

  • クラスタを削除するには:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}