Dataproc での HBase と Apache Spark の併用


目標

このチュートリアルでは、次の方法について説明します。

  1. Dataproc クラスタを作成し、クラスタに Apache HBase と Apache ZooKeeper をインストールする
  2. Dataproc クラスタのマスターノードで実行されている HBase シェルを使用して HBase テーブルを作成する
  3. Cloud Shell を使用して、Java または PySpark Spark ジョブを Dataproc サービスに送信して HBase テーブルへのデータの書き込みと読み取りを行う

費用

このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

Google Cloud Platform プロジェクトをまだ作成していない場合は、作成します。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  8. Dataproc クラスタを作成する

    1. Cloud Shell セッション ターミナルで次のコマンドを実行して、以下の操作を行います。

      • HBase コンポーネントと ZooKeeper コンポーネントをインストールする
      • 3 つのワーカーノードをプロビジョニングする(このチュートリアルのコードを実行するには、3~5 つのワーカーをおすすめします)
      • コンポーネント ゲートウェイを有効にする
      • イメージ バージョン 2.0 を使用する
      • --properties フラグを使用して、Spark ドライバとエグゼキュータのクラスパスに HBase 構成と HBase ライブラリを追加する
    gcloud dataproc clusters create cluster-name \
        --region=region \
        --optional-components=HBASE,ZOOKEEPER \
        --num-workers=3 \
        --enable-component-gateway \
        --image-version=2.0 \
        --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
    

    コネクタのインストールを確認する

    1. Google Cloud コンソールまたは Cloud Shell セッション ターミナルから、Dataproc クラスタ マスターノードに SSH で接続します。

    2. マスターノードで Apache HBase Spark コネクタのインストールを確認します。

      ls -l /usr/lib/spark/jars | grep hbase-spark
      
      出力例:
      -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
      

    3. SSH セッション ターミナルを開いたままにして、以下の操作を行います。

      1. HBase テーブルを作成する
      2. (Java ユーザー): クラスタのマスターノードでコマンドを実行して、クラスタにインストールされているコンポーネントのバージョンを確認する
      3. コードを実行した後、Hbase テーブルをスキャンする

    HBase テーブルを作成する

    前の手順で開いたマスターノードの SSH セッション ターミナルで、このセクションに記載されているコマンドを実行します。

    1. HBase シェルを開きます。

      hbase shell
      

    2. 「cf」列ファミリーを持つ HBase の「my-table」を作成します。

      create 'my_table','cf'
      

      1. テーブルの作成を確認するには、 Google Cloud コンソールで、Google Cloud コンソール コンポーネント ゲートウェイのリンクの [HBase] をクリックし、Apache HBase UI を開きます。my-table は、[ホーム] ページの [テーブル] セクションに表示されます。

    Spark コードを表示する

    Java

    package hbase;
    
    import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SparkHBaseMain {
        public static class SampleData implements Serializable {
            private String key;
            private String name;
    
    
            public SampleData(String key, String name) {
                this.key = key;
                this.name = name;
            }
    
            public SampleData() {
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
        }
        public static void main(String[] args) {
            // Init SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .master("yarn")
                    .appName("spark-hbase-tutorial")
                    .getOrCreate();
    
            // Data Schema
            String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                    "\"rowkey\":\"key\"," +
                    "\"columns\":{" +
                    "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                    "}" +
                    "}";
    
            Map<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);
    
            Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                    new SampleData("key1", "foo"),
                    new SampleData("key2", "bar")), SampleData.class);
    
            // Write to HBase
            ds.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .mode("overwrite")
                    .save();
    
            // Read from HBase
            Dataset dataset = spark.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .load();
            dataset.show();
        }
    }
    

    Python

    from pyspark.sql import SparkSession
    
    # Initialize Spark Session
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-hbase-tutorial') \
      .getOrCreate()
    
    data_source_format = ''
    
    # Create some test data
    df = spark.createDataFrame(
        [
            ("key1", "foo"),
            ("key2", "bar"),
        ],
        ["key", "name"]
    )
    
    # Define the schema for catalog
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"my_table"},
        "rowkey":"key",
        "columns":{
            "key":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"cf", "col":"name", "type":"string"}
        }
    }""".split())
    
    # Write to HBase
    df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()
    
    # Read from HBase
    result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
    result.show()

    コードを実行する

    1. Cloud Shell セッション ターミナルを開きます。

    2. Cloud Shell セッション ターミナルに GitHub の GoogleCloudDataproc/cloud-dataproc リポジトリのクローンを作成します。

      git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
      

    3. cloud-dataproc/spark-hbase ディレクトリに変更します。

      cd cloud-dataproc/spark-hbase
      
      出力例:
      user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
      

    4. Dataproc ジョブを送信します。

    Java

    1. pom.xml ファイルでコンポーネントのバージョンを設定します。
      1. Dataproc 2.0.x リリース バージョンのページには、イメージ 2.0 の最新および直前の 4 つのサブマイナー バージョンにインストールされている Scala、Spark、HBase コンポーネントのバージョンが記載されています。
        1. 2.0 イメージ バージョンのクラスタのサブマイナー バージョンを確認するには、Google Cloud コンソールの [クラスタ] ページでクラスタ名をクリックして、[クラスタの詳細] ページを開きます。クラスタのイメージ バージョンが一覧表示されます。
      2. または、クラスタのマスターノードから SSH セッション ターミナルで次のコマンドを実行して、コンポーネントのバージョンを確認することもできます。
        1. Scala のバージョンを確認します。
          scala -version
          
        2. Spark のバージョンを確認します(Ctrl+D キーを押して終了します)。
          spark-shell
          
        3. HBase のバージョンを確認します。
          hbase version
          
        4. Maven の pom.xml で、Spark、Scala、HBase のバージョン依存関係を特定します。
          <properties>
            <scala.version>scala full version (for example, 2.12.14)</scala.version>
            <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
            <spark.version>spark version (for example, 3.1.2)</spark.version>
            <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
            <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
          </properties>
          
          注: hbase-spark.version は現在の Spark HBase コネクタのバージョンです。このバージョン番号は変更しないでください。
      3. Cloud Shell エディタで pom.xml ファイルを編集し、正しい Scala、Spark、HBase のバージョン番号を挿入します。編集が完了したら、[ターミナルを開く] をクリックして Cloud Shell ターミナル コマンドラインに戻ります。
        cloudshell edit .
        
      4. Cloud Shell で Java 8 に切り替えます。コードをビルドするには、この JDK バージョンが必要です(プラグインの警告メッセージは無視してかまいません)。
        sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
        
      5. Java 8 のインストールを確認します。
        java -version
        
        出力例:
        openjdk version "1.8..."
         
    2. jar ファイルをビルドします。
      mvn clean package
      
      .jar ファイルは /target サブディレクトリに配置されます(例: target/spark-hbase-1.0-SNAPSHOT.jar)。
    3. ジョブを送信します。

      gcloud dataproc jobs submit spark \
          --class=hbase.SparkHBaseMain  \
          --jars=target/filename.jar \
          --region=cluster-region \
          --cluster=cluster-name
      
      • --jars: .jar ファイルの名前を「target/」の後、「.jar」の前に挿入します。
      • クラスタを作成したときに Spark ドライバとエグゼキュータの HBase クラスパスを設定しなかった場合、ジョブ送信コマンドで次の ‑‑properties フラグを指定して、ジョブ送信ごとにクラスパスを設定する必要があります。
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    4. Cloud Shell セッション ターミナルの出力で HBase テーブルの出力を確認します。

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Python

    1. ジョブを送信します。

      gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
          --region=cluster-region \
          --cluster=cluster-name
      
      • クラスタを作成したときに Spark ドライバとエグゼキュータの HBase クラスパスを設定しなかった場合、ジョブ送信コマンドで次の ‑‑properties フラグを指定して、ジョブ送信ごとにクラスパスを設定する必要があります。
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    2. Cloud Shell セッション ターミナルの出力で HBase テーブルの出力を確認します。

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    HBase テーブルをスキャンする

    HBase テーブルのコンテンツをスキャンするには、コネクタのインストールを確認するで開いたマスターノードの SSH セッション ターミナルで次のコマンドを実行します。

    1. HBase シェルを開きます。
      hbase shell
      
    2. 「my-table」をスキャンします。
      scan 'my_table'
      
      出力例:
      ROW               COLUMN+CELL
       key1             column=cf:name, timestamp=1647364013561, value=foo
       key2             column=cf:name, timestamp=1647364012817, value=bar
      2 row(s)
      Took 0.5009 seconds
      

      クリーンアップ

      チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

      プロジェクトの削除

      課金されないようにする最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

      プロジェクトを削除するには:

      1. In the Google Cloud console, go to the Manage resources page.

        Go to Manage resources

      2. In the project list, select the project that you want to delete, and then click Delete.
      3. In the dialog, type the project ID, and then click Shut down to delete the project.

      クラスタを削除する

      • クラスタを削除するには:
        gcloud dataproc clusters delete cluster-name \
            --region=${REGION}