Usa Apache Spark con HBase en Dataproc


Objetivos

En este instructivo, se muestra cómo hacer lo siguiente:

  1. Crear un clúster de Dataproc, instalar Apache HBase y Apache ZooKeeper en el clúster
  2. Crea una tabla de HBase con la shell de HBase que se ejecuta en el nodo principal de el clúster de Dataproc
  3. Usa Cloud Shell para enviar un trabajo de Spark de Java o PySpark al servicio de Dataproc que escribe datos en la tabla de HBase y, luego, los lee.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

Si aún no lo hiciste, crea un proyecto de Google Cloud Platform.

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

Crea un clúster de Dataproc

  1. Ejecuta el siguiente comando en la terminal de una sesión de Cloud Shell para lo siguiente:

    • Instala los componentes HBase y ZooKeeper.
    • Aprovisiona tres nodos trabajadores (se recomienda que uses entre tres y cinco trabajadores para ejecuta el código de este instructivo)
    • Habilita la Puerta de enlace de componentes.
    • Usa la versión 2.0 de la imagen
    • Usa la marca --properties para agregar la configuración y la biblioteca de HBase las rutas de clase del controlador y el ejecutor de Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verifica la instalación del conector

  1. Desde la consola de Google Cloud o la terminal de una sesión de Cloud Shell, establece una conexión SSH al nodo principal del clúster de Dataproc.

  2. Verifica la instalación del conector de Apache HBase Spark en el nodo principal:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Resultado de muestra:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Mantén abierta la terminal de la sesión de SSH para lo siguiente:

    1. Crea una tabla de HBase
    2. (Usuarios de Java): ejecuta comandos en la instancia principal. del clúster para determinar las versiones de los componentes instalados el clúster
    3. Analiza tu tabla Hbase después de ejecutar el código

Crea una tabla de HBase

Ejecuta los comandos que se indican en esta sección en la terminal de la sesión de SSH del nodo maestro que abriste en el paso anterior.

  1. Abre la shell de HBase:

    hbase shell
    

  2. Crea un recurso “my-table” de HBase con un “cf” familia de columnas:

    create 'my_table','cf'
    

    1. Para confirmar la creación de la tabla, haz clic en HBase en la consola de Google Cloud. en los vínculos de la puerta de enlace de los componentes de la consola de Google Cloud para abrir la IU de Apache HBase. my-table aparece en la sección Tablas de la Página principal.

Ve el código de Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;


        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Ejecuta el código

  1. Abre una terminal de sesión de Cloud Shell.

  2. Clona el repositorio GoogleCloudDataproc/cloud-dataproc de GitHub en la terminal de la sesión de Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Cambia al directorio cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Resultado de muestra:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Envía el trabajo de Dataproc.

Java

  1. Establece las versiones de los componentes en el archivo pom.xml.
    1. El servicio de Dataproc Versiones de actualización 2.0.x enumera las versiones de los componentes de Scala, Spark y HBase instaladas con las últimas cuatro versiones de imágenes 2.0 submenores.
      1. Para encontrar la versión submenor del clúster de versiones con imágenes 2.0, haz clic en el nombre del clúster Clústeres en la La consola de Google Cloud para abrir la página Detalles del clúster, en la que del clúster Versión de la imagen.
    2. Como alternativa, puedes ejecutar los siguientes comandos en un Terminal de sesión SSH desde el nodo principal de tu clúster para determinar las versiones de los componentes:
      1. Verifica la versión de Scala:
        scala -version
        
      2. Verifica la versión de Spark (Control D para salir):
        spark-shell
        
      3. Verifica la versión de HBase:
        hbase version
        
      4. Identificar las dependencias de las versiones de Spark, Scala y HBase en el módulo pom.xml:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Nota: hbase-spark.version es la versión actual del conector de HBase de Spark. No cambies este número de versión.
    3. Edita el archivo pom.xml en el editor de Cloud Shell para insertar los números de versión correctos de Scala, Spark y HBase. Haz clic en Abrir terminal cuando termines de editar para volver. la línea de comandos de la terminal de Cloud Shell.
      cloudshell edit .
      
    4. Cambia a Java 8 en Cloud Shell. Esta versión de JDK es necesaria para Compila el código (puedes ignorar los mensajes de advertencia sobre complementos):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifica la instalación de Java 8:
      java -version
      
      Resultado de muestra:
      openjdk version "1.8..."
       
  2. Compila el archivo jar:
    mvn clean package
    
    El archivo .jar se coloca en el subdirectorio /target (por ejemplo, target/spark-hbase-1.0-SNAPSHOT.jar.
  3. Envía el trabajo.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: Inserta el nombre del archivo .jar después de "target/". y antes de ".jar".
    • Si no configuraste las instrucciones de clase de HBase del controlador y el ejecutor de Spark cuando creaste tu clúster, debes configurarlas con cada envío de trabajo. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajo:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Visualiza el resultado de la tabla de HBase en el resultado de la terminal de la sesión de Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Envía el trabajo.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Si no configuraste las rutas de clase de HBase y controlador de Spark cuando creaste tu clúster, debes establecerlos con cada envío de trabajo incluyendo siguiente marca ‑‑properties en el comando de envío de trabajo:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Visualiza el resultado de la tabla de HBase en el resultado de la terminal de la sesión de Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Escanea la tabla de HBase

Puedes analizar el contenido de tu tabla de HBase ejecutando los siguientes comandos en la sesión SSH del nodo principal terminal que abriste en Verifica la instalación del conector:

  1. Abre la shell de HBase:
    hbase shell
    
  2. Analiza “my-table”:
    scan 'my_table'
    
    Resultado de muestra:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Limpia

Una vez que completes el instructivo, puedes limpiar los recursos que creaste para que dejen de usar la cuota y generar cargos. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, haga lo siguiente:

  1. En la consola de Google Cloud, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra el clúster

  • Para borrar tu clúster, realiza los siguientes pasos:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}