Utilizzo di Apache Spark con HBase su Dataproc


Obiettivi

Questo tutorial ti mostra come:

  1. Creare un cluster Dataproc, installando Apache HBase e Apache ZooKeeper sul cluster
  2. Creare una tabella HBase utilizzando la shell HBase in esecuzione sul nodo master del cluster Dataproc
  3. Usa Cloud Shell per inviare un job Java o PySpark Spark al servizio Dataproc che scrive i dati e poi li legge nella tabella HBase.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud possono essere idonei a una prova senza costi aggiuntivi.

Prima di iniziare

Se non l'hai ancora fatto, crea un progetto per la Google Cloud Platform.

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Abilita le API Dataproc and Compute Engine.

    Abilita le API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  7. Abilita le API Dataproc and Compute Engine.

    Abilita le API

Crea un cluster Dataproc

  1. Esegui questo comando in un terminale di sessione Cloud Shell per:

    • Installa i componenti HBase e ZooKeeper.
    • Esegui il provisioning di tre nodi worker (sono consigliati da tre a cinque worker per eseguire il codice in questo tutorial)
    • Attiva il gateway dei componenti
    • Utilizza immagine versione 2.0
    • Utilizza il flag --properties per aggiungere la configurazione HBase e la libreria HBase al driver Spark ed ai percorsi delle classi dell'esecutore.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verifica l'installazione del connettore

  1. Dalla console Google Cloud o dal terminale di sessione Cloud Shell, esegui SSH nel nodo master del cluster Dataproc.

  2. Verifica l'installazione del connettore Spark HBase di Apache sul nodo master:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Esempio di output:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Tieni aperto il terminale di sessione SSH per:

    1. Crea una tabella HBase
    2. (Utenti Java): esegui comandi sul nodo master del cluster per determinare le versioni dei componenti installati sul cluster
    3. Scansiona la tabella Hbase dopo aver eseguito il codice

Crea una tabella HBase

Esegui i comandi elencati in questa sezione nel terminale di sessione SSH del nodo master che hai aperto nel passaggio precedente.

  1. Apri la shell di HBase:

    hbase shell
    

  2. Crea un valore HBase "my-table" con una famiglia di colonne "cf":

    create 'my_table','cf'
    

    1. Per confermare la creazione della tabella, nella console Google Cloud fai clic su HBase nei link del gateway dei componenti della console Google Cloud per aprire l'interfaccia utente di Apache HBase. my-table è elencato nella sezione Tabelle della Home page.

Visualizza il codice Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Esegui il codice

  1. Apri un terminale di sessione Cloud Shell.

  2. Clona il repository GitHub GoogleCloudDataproc/cloud-dataproc nel terminale di sessione Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Passa alla directory cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Esempio di output:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Invia il job di Dataproc.

Java

  1. Imposta le versioni dei componenti nel file pom.xml.
    1. Nella pagina Versioni release 2.0.x di Dataproc sono elencate le versioni dei componenti Scala, Spark e HBase installate con le ultime quattro versioni secondarie di Image 2.0.
      1. Per trovare la versione secondaria del cluster della versione immagine 2.0, fai clic sul nome del cluster nella pagina Cluster della console Google Cloud per aprire la pagina Dettagli cluster, in cui è indicata la Versione immagine del cluster.
    2. In alternativa, puoi eseguire i comandi seguenti in un terminale di sessione SSH dal nodo master del tuo cluster per determinare le versioni dei componenti:
      1. Controlla la versione di Scala:
        scala -version
        
      2. Controlla la versione di Spark (Ctrl-D per uscire):
        spark-shell
        
      3. Controlla la versione di HBase:
        hbase version
        
      4. Identifica le dipendenze delle versioni Spark, Scala e HBase in Maven pom.xml:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Nota: hbase-spark.version è la versione attuale del connettore Spark HBase; lascia invariato questo numero di versione.
    3. Modifica il file pom.xml nell'editor di Cloud Shell per inserire i numeri di versione di Scala, Spark e HBase corretti. Al termine della modifica, fai clic su Apri terminale per tornare alla riga di comando del terminale Cloud Shell.
      cloudshell edit .
      
    4. Passa a Java 8 in Cloud Shell. Questa versione di JDK è necessaria per creare il codice (puoi ignorare gli eventuali messaggi di avviso relativi ai plug-in):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifica l'installazione di Java 8:
      java -version
      
      Esempio di output:
      openjdk version "1.8..."
       
  2. Crea il file jar:
    mvn clean package
    
    Il file .jar si trova nella sottodirectory /target, ad esempio target/spark-hbase-1.0-SNAPSHOT.jar.
  3. Invia il job.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: inserisci il nome del file .jar dopo "target/" e prima di ".jar".
    • Se non hai impostato i classpath HBase del driver ed esecutore Spark quando hai creato il cluster, devi impostarli per ogni invio del job includendo il seguente flag ‑‑properties nel comando di invio del job:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Visualizza l'output della tabella HBase nell'output del terminale della sessione Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Invia il job.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Se non hai impostato i classpath HBase del driver ed esecutore Spark quando hai creato il cluster, devi impostarli per ogni invio del job includendo il seguente flag ‑‑properties nel comando di invio del job:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Visualizza l'output della tabella HBase nell'output del terminale della sessione Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Analizza la tabella HBase

Puoi analizzare i contenuti della tabella HBase eseguendo i seguenti comandi nel terminale di sessione SSH del nodo master che hai aperto in Verifica l'installazione del connettore:

  1. Apri la shell di HBase:
    hbase shell
    
  2. Scansiona "my-table":
    scan 'my_table'
    
    Esempio di output:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Esegui la pulizia

Al termine del tutorial, puoi eseguire la pulizia delle risorse che hai creato in modo che smettano di utilizzare la quota e smettano di essere addebitati. Le sezioni seguenti descrivono come eliminare o disattivare queste risorse.

Elimina il progetto

Il modo più semplice per eliminare la fatturazione è eliminare il progetto che hai creato per il tutorial.

Per eliminare il progetto:

  1. Nella console Google Cloud, vai alla pagina Gestisci risorse.

    Vai a Gestisci risorse

  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare, quindi fai clic su Elimina.
  3. Nella finestra di dialogo, digita l'ID del progetto e fai clic su Chiudi per eliminare il progetto.

Eliminare il cluster

  • Per eliminare il cluster:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}