Usa Apache Spark con HBase en Dataproc

Objetivos

En este instructivo, se muestra cómo hacer lo siguiente:

  1. Crear un clúster de Dataproc y, luego, instalar Apache HBase y Apache ZooKeeper en el clúster
  2. Crea una tabla de HBase con la shell de HBase que se ejecuta en el nodo principal del clúster de Dataproc
  3. Usa Cloud Shell para enviar un trabajo de Spark de Java o PySpark al servicio de Dataproc que escribe datos en la tabla de HBase y, luego, los lee

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

Si aún no lo hiciste, crea un proyecto de Google Cloud Platform.

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.

  4. Habilita las API de Dataproc and Compute Engine.

    Habilita las API

  5. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  6. Asegúrate de que la facturación esté habilitada para tu proyecto de Cloud. Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.

  7. Habilita las API de Dataproc and Compute Engine.

    Habilita las API

Crea un clúster de Dataproc

  1. Ejecuta el siguiente comando en una terminal de sesión de Cloud Shell para realizar lo siguiente:

    • Instala los componentes de HBase y ZooKeeper
    • Aprovisiona tres nodos trabajadores (se recomienda de tres a cinco trabajadores para ejecutar el código en este instructivo).
    • Habilita la puerta de enlace de componentes.
    • Usar versión de imagen 2.0
    • Usa la marca --properties para agregar la configuración de HBase y la biblioteca de HBase a las rutas de clase del controlador y el ejecutor de Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verifica la instalación del conector

  1. Desde Cloud Console o una terminal de sesión de Cloud Shell, conéctate al nodo principal del clúster de Dataproc.

  2. Verifica la instalación del conector de Spark de Apache HBase en el nodo principal:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Resultado de muestra:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Mantenga la terminal de la sesión de SSH abierta para lo siguiente:

    1. Crea una tabla de HBase
    2. (Usuarios de Java): ejecuta comandos en el nodo principal del clúster para determinar las versiones de los componentes instalados en el clúster
    3. Analiza la tabla Hbase después de ejecutar el código.

Crea una tabla de HBase

Ejecuta los comandos que se mencionan en esta sección en la terminal de la sesión SSH del nodo principal que abriste en el paso anterior.

  1. Abre el shell de HBase:

    hbase shell
    

  2. Crear una HBase 'my-table' con una familia de columnas 'cf&#39:

    create 'my_table','cf'
    

    1. Para confirmar la creación de la tabla, en Cloud Console, haz clic en HBase en los vínculos de Puerta de enlace de componentes de Cloud Console a fin de abrir la IU de Apache HBase. my-table aparece en la sección Tablas de la página Página principal.

Visualiza el código de Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Ejecuta el código

  1. Abre una terminal de sesión de Cloud Shell.

  2. Clona el repositorio GoogleCloudDataproc/cloud-dataproc de GitHub en la terminal de la sesión de Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Cambia al directorio cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Resultado de muestra:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Envía el trabajo de Dataproc.

Java

  1. Configura las versiones de los componentes en el archivo pom.xml.
    1. En la página de versiones de la versión 2.0.x de Dataproc, se enumeran las versiones de los componentes de Scala, Spark y HBase instaladas con las últimas cuatro versiones secundarias de la imagen 2.0.
      1. Para encontrar la versión secundaria del clúster de la versión de imagen 2.0, haz clic en el nombre del clúster en la página Clústeres de Google Cloud Console para abrir la página Detalles del clúster, en la que aparece la Versión de la imagen del clúster.
    2. Como alternativa, puedes ejecutar los siguientes comandos en una terminación de sesión SSH desde el nodo principal del clúster para determinar las versiones de los componentes:
      1. Verifica la versión de escalar:
        scala -version
        
      2. Verifica la versión de Spark (control-D para salir):
        spark-shell
        
      3. Verifica la versión de HBase:
        hbase version
        
      4. Identifica las dependencias de la versión de Spark, Scala y HBase en el pom.xml de Maven:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Nota: hbase-spark.version es la versión actual del conector de HBase para Spark. Deja este número de versión sin cambios.
    3. Edita el archivo pom.xml en el editor de Cloud Shell para insertar los números de versión de Scala, Spark y HBase correctos. Haz clic en Abrir terminal cuando termines de editar para volver a la línea de comandos de la terminal de Cloud Shell.
      cloudshell edit .
      
    4. Cambia a Java 8 en Cloud Shell. Esta versión de JDK es necesaria para compilar el código (puedes ignorar cualquier mensaje de advertencia del complemento):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifica la instalación de Java 8:
      java -version
      
      Resultado de muestra:
      openjdk version "1.8..."
       
  2. Compila el archivo jar:
    mvn clean package
    
    El archivo .jar se coloca en el subdirectorio /target (por ejemplo, target/spark-hbase-1.0-SNAPSHOT.jar).
  3. Envía el trabajo.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: Inserta el nombre del archivo .jar después de "target/" y antes de &quot.jar".
    • Si no configuraste el controlador Spark y las rutas de clase del ejecutor de HBase cuando creaste el clúster, debes configurarlos con cada envío de trabajo mediante la inclusión de la siguiente marca ‑‑properties en el comando de envío de trabajos:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Visualiza el resultado de la tabla de HBase en el resultado de la terminal de sesión de Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Envía el trabajo.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Si no configuraste el controlador Spark y las rutas de clase del ejecutor de HBase cuando creaste el clúster, debes configurarlos con cada envío de trabajo mediante la inclusión de la siguiente marca ‑‑properties en el comando de envío de trabajos:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Visualiza el resultado de la tabla de HBase en el resultado de la terminal de sesión de Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Analiza la tabla de HBase

Puedes analizar el contenido de tu tabla de HBase si ejecutas los siguientes comandos en la terminal de la sesión SSH del nodo principal que abriste en Verifica la instalación del conector:

  1. Abre el shell de HBase:
    hbase shell
    
  2. Escanear 'mi-tabla':
    scan 'my_table'
    
    Resultado de muestra:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Limpia

Una vez que completes el instructivo, puedes limpiar los recursos que creaste para que dejen de usar la cuota y generar cargos. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, haz lo siguiente:

  1. En la consola de Cloud, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra el clúster

  • Para borrar tu clúster, haz lo siguiente:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}