Usa Apache Spark con HBase en Dataproc


Objetivos

En este instructivo, se muestra cómo hacer lo siguiente:

  1. Crear un clúster de Dataproc y, luego, instalar Apache HBase y Apache ZooKeeper en el clúster
  2. Crear una tabla de HBase con la shell de HBase que se ejecuta en el nodo de la instancia principal del clúster de Dataproc
  3. Usa Cloud Shell para enviar un trabajo de Java o PySpark Spark al servicio de Dataproc que escribe datos en la tabla de HBase y, luego, los lee

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

Si aún no lo hiciste, crea un proyecto de Google Cloud Platform.

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  4. Habilita las API de Dataproc and Compute Engine.

    Habilita las API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  7. Habilita las API de Dataproc and Compute Engine.

    Habilita las API

Crea un clúster de Dataproc

  1. Ejecuta el siguiente comando en una terminal de sesión de Cloud Shell para hacer lo siguiente:

    • Instala los componentes de HBase y ZooKeeper.
    • Aprovisiona tres nodos trabajadores (se recomienda entre tres y cinco trabajadores para ejecutar el código en este instructivo).
    • Habilita la puerta de enlace de componentes.
    • Usar la versión de la imagen 2.0
    • Usa la marca --properties para agregar la configuración y la biblioteca de HBase a las rutas de clase del controlador y ejecutor de Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verifica la instalación del conector

  1. Desde la consola de Google Cloud o una terminal de la sesión de Cloud Shell, establece una conexión SSH al instancia principal del clúster de Dataproc.

  2. Verifica la instalación del conector de Apache HBase Spark en el nodo principal:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Resultado de muestra:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Mantén abierta la terminal de la sesión SSH para hacer lo siguiente:

    1. Crea una tabla de HBase
    2. (Usuarios de Java): ejecuta comandos en el nodo principal del clúster para determinar las versiones de los componentes instalados en el clúster.
    3. Escanea tu tabla de Hbase después de ejecutar el código

Crea una tabla de HBase

Ejecuta los comandos enumerados en esta sección en la terminal de la sesión SSH del nodo principal que abriste en el paso anterior.

  1. Abre la shell de HBase:

    hbase shell
    

  2. Crea una “my-table” de HBase con una familia de columnas “cf”:

    create 'my_table','cf'
    

    1. Para confirmar la creación de la tabla, en la consola de Google Cloud, haz clic en HBase en los vínculos de puerta de enlace de componentes de la consola de Google Cloud para abrir la IU de Apache HBase. my-table aparece en la sección Tablas de la página principal.

Visualiza el código de Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Ejecuta el código

  1. Abre una terminal de sesión de Cloud Shell.

  2. Clona el repositorio de GitHub GoogleCloudDataproc/cloud-dataproc en tu terminal de sesión de Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Cambia al directorio cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Resultado de muestra:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Envía el trabajo de Dataproc.

Java

  1. Establece las versiones de los componentes en el archivo pom.xml.
    1. En la página Versiones de actualización 2.0.x de Dataproc, se enumeran las versiones de componentes de Scala, Spark y HBase instaladas con las cuatro versiones submenores de imagen 2.0 más recientes y las últimas cuatro.
      1. Para encontrar la versión submenor del clúster de la versión de imagen 2.0, haz clic en el nombre del clúster en la página Clústeres de Google Cloud Console para abrir la página Detalles del clúster, en la que aparece la Versión de la imagen del clúster.
    2. Como alternativa, puedes ejecutar los siguientes comandos en una terminal de sesión SSH desde el nodo principal de tu clúster para determinar las versiones de los componentes:
      1. Verifica la versión de scala:
        scala -version
        
      2. Comprueba la versión de Spark (control-D para salir):
        spark-shell
        
      3. Verifica la versión de HBase:
        hbase version
        
      4. Identifica las dependencias de las versiones de Spark, Scala y HBase en pom.xml de Maven:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Nota: hbase-spark.version es la versión actual del conector de Spark HBase. No modifiques este número de versión.
    3. Edita el archivo pom.xml en el editor de Cloud Shell para insertar los números de versión correctos de Scala, Spark y HBase. Cuando termines de editar, haz clic en Abrir terminal para volver a la línea de comandos de la terminal de Cloud Shell.
      cloudshell edit .
      
    4. Cambia a Java 8 en Cloud Shell. Esta versión de JDK es necesaria para compilar el código (puedes ignorar cualquier mensaje de advertencia del complemento):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifica la instalación de Java 8:
      java -version
      
      Resultado de muestra:
      openjdk version "1.8..."
       
  2. Compila el archivo jar:
    mvn clean package
    
    El archivo .jar se coloca en el subdirectorio /target (por ejemplo, target/spark-hbase-1.0-SNAPSHOT.jar).
  3. Envía el trabajo.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: Inserta el nombre de tu archivo .jar después de "target/" y antes de ".jar".
    • Si no configuraste las rutas de clase del controlador de Spark y del ejecutor de HBase cuando creaste tu clúster, debes configurarlas con cada envío de trabajos. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajos:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Visualiza el resultado de la tabla de HBase en el resultado de la terminal de la sesión de Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Envía el trabajo.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Si no configuraste las rutas de clase del controlador de Spark y del ejecutor de HBase cuando creaste tu clúster, debes configurarlas con cada envío de trabajos. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajos:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Visualiza el resultado de la tabla de HBase en el resultado de la terminal de la sesión de Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Analiza la tabla de HBase

Puedes analizar el contenido de tu tabla de HBase mediante la ejecución de los siguientes comandos en la terminal de sesión SSH del nodo principal que abriste en Verificar la instalación del conector:

  1. Abre la shell de HBase:
    hbase shell
    
  2. Escanear "my-table":
    scan 'my_table'
    
    Resultado de muestra:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Limpia

Una vez que completes el instructivo, puedes limpiar los recursos que creaste para que dejen de usar la cuota y generar cargos. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, haga lo siguiente:

  1. En la consola de Google Cloud, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borrar el clúster

  • Para borrar tu clúster, haz lo siguiente:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}