Utilizzare Apache Spark con HBase su Dataproc


Obiettivi

Questo tutorial illustra come:

  1. Crea un cluster Dataproc, installando Apache HBase e Apache ZooKeeper sul cluster
  2. Crea una tabella HBase utilizzando la shell HBase in esecuzione sul nodo principale del cluster Dataproc
  3. Utilizza Cloud Shell per inviare un job Java o PySpark Spark alla Servizio Dataproc che scrive i dati e poi legge dalla tabella HBase

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi basata sull'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Prima di iniziare

Se non l'hai ancora fatto, crea un progetto Google Cloud Platform.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

Crea un cluster Dataproc

  1. Esegui questo comando in un Terminale di sessione Cloud Shell per:

    • Installa i componenti HBase e ZooKeeper
    • Esegui il provisioning di tre nodi worker (sono consigliati da tre a cinque worker esegui il codice in questo tutorial)
    • Attiva il gateway dei componenti.
    • Utilizzare la versione 2.0 dell'immagine
    • Utilizza il flag --properties per aggiungere la configurazione e la libreria HBase ai percorsi di classe del driver e dell'executor di Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verificare l'installazione del connettore

  1. Dalla console Google Cloud o da un terminale di sessione Cloud Shell Accedi tramite SSH al nodo master del cluster Dataproc.

  2. Verifica l'installazione del connettore Spark di Apache HBase sul nodo master:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Esempio di output:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Mantieni aperto il terminale della sessione SSH per:

    1. Crea una tabella HBase
    2. (Utenti Java): esegui comandi sul nodo master del cluster per determinare le versioni dei componenti installati sul cluster
    3. Scansiona la tabella Hbase dopo esegui il codice

Creare una tabella HBase

Esegui i comandi elencati in questa sezione nel terminale della sessione SSH del nodo master che hai aperto nel passaggio precedente.

  1. Apri la shell HBase:

    hbase shell
    

  2. Crea un oggetto "my-table" HBase con un "cf" famiglia di colonne:

    create 'my_table','cf'
    

    1. Per confermare la creazione della tabella, nella console Google Cloud fai clic su HBase nei link della console Google Cloud Component Gateway per aprire l'interfaccia utente di Apache HBase. my-table è elencato in nella sezione Tabelle della Home page.

Visualizza il codice Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;


        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Esegui il codice

  1. Apri un terminale della sessione Cloud Shell.

  2. Clona il repository GitHub GoogleCloudDataproc/cloud-dataproc nel terminale della sessione Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Passa alla directory cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Esempio di output:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Inviare il job di Dataproc.

Java

  1. Imposta le versioni dei componenti nel file pom.xml.
    1. Dataproc Versioni di release 2.0.x pagina elenca le versioni dei componenti Scala, Spark e HBase installate con la più recente e le ultime quattro versioni secondarie dell'immagine 2.0.
      1. Per trovare la versione secondaria del cluster della versione dell'immagine 2.0, fai clic sul nome del cluster nella pagina Cluster nella console Google Cloud per aprire la pagina Dettagli cluster, in cui il cluster Versione immagine.
    2. In alternativa, puoi eseguire i seguenti comandi in un terminal di sessione SSH dal nodo principale del cluster per determinare le versioni dei componenti:
      1. Verifica la versione scala:
        scala -version
        
      2. Controlla la versione di Spark (CTRL+D per uscire):
        spark-shell
        
      3. Controlla la versione di HBase:
        hbase version
        
      4. Identificare le dipendenze delle versioni Spark, Scala e HBase nel modulo Maven pom.xml:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Nota: hbase-spark.version è la versione attuale del connettore Spark HBase. lascia invariato il numero di versione.
    3. Modifica il file pom.xml nell'editor di Cloud Shell per inserire i numeri di versione corretti di Scala, Spark e HBase. Al termine della modifica, fai clic su Apri terminale per tornare a nella riga di comando del terminale Cloud Shell.
      cloudshell edit .
      
    4. Passa a Java 8 in Cloud Shell. Questa versione di JDK è necessaria per compilare il codice (puoi ignorare eventuali messaggi di avviso del plug-in):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifica l'installazione di Java 8:
      java -version
      
      Esempio di output:
      openjdk version "1.8..."
       
  2. Crea il file jar:
    mvn clean package
    
    Il file .jar è posizionato nella sottodirectory /target (ad esempio, target/spark-hbase-1.0-SNAPSHOT.jar.
  3. Invia il job.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: inserisci il nome del file .jar dopo "target/" e prima di ".jar".
    • Se non hai impostato il driver Spark e i percorsi di classe HBase dell'esecutore creato il tuo cluster, devi impostarle a ogni invio di job includendo seguente flag ‑‑properties nel comando di invio del job:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Visualizza l'output della tabella HBase nell'output del terminale della sessione Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Invia il job.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Se non hai impostato il driver Spark e i percorsi di classe HBase dell'esecutore creato il tuo cluster, devi impostarle a ogni invio di job includendo seguente flag ‑‑properties nel comando di invio del job:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Visualizza l'output della tabella HBase nell'output del terminale della sessione Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Esegui la scansione della tabella HBase

Puoi scansionare il contenuto della tabella HBase tramite eseguendo i comandi riportati di seguito nella sessione SSH del nodo master che hai aperto in Verifica l'installazione del connettore:

  1. Apri la shell HBase:
    hbase shell
    
  2. Cerca "my-table":
    scan 'my_table'
    
    Esempio di output:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Esegui la pulizia

Al termine del tutorial, puoi eseguire la pulizia delle risorse che hai creato in modo che smettono di usare la quota e comportano addebiti. Le sezioni seguenti descrivono come eliminare o disattivare queste risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione creato per il tutorial.

Per eliminare il progetto:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Elimina il cluster

  • Per eliminare il cluster:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}