Usar Apache Spark con HBase en Dataproc

Objetivos

En este tutorial se explica cómo hacer lo siguiente:

  1. Crea un clúster de Dataproc e instala Apache HBase y Apache ZooKeeper en él
  2. Crea una tabla de HBase con el shell de HBase que se ejecuta en el nodo maestro del clúster de Dataproc
  3. Usar Cloud Shell para enviar una tarea de Spark de Java o PySpark al servicio Dataproc que escribe datos en la tabla de HBase y, a continuación, lee datos de ella

Costes

En este documento, se utilizan los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Antes de empezar

Si aún no lo has hecho, crea un proyecto de Google Cloud Platform.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Crear una agrupación Dataproc

    1. Ejecuta el siguiente comando en un terminal de sesión de Cloud Shell para hacer lo siguiente:

      • Instala los componentes HBase y ZooKeeper.
      • Aprovisiona tres nodos de trabajador (se recomienda usar entre tres y cinco trabajadores para ejecutar el código de este tutorial).
      • Habilita Pasarela de componentes.
      • Usar la versión 2.0 de la imagen
      • Usa la marca --properties para añadir la configuración y la biblioteca de HBase a las rutas de clase del controlador y del ejecutor de Spark.
    gcloud dataproc clusters create cluster-name \
        --region=region \
        --optional-components=HBASE,ZOOKEEPER \
        --num-workers=3 \
        --enable-component-gateway \
        --image-version=2.0 \
        --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
    

    Verificar la instalación del conector

    1. Desde la Google Cloud consola o un terminal de sesión de Cloud Shell, conéctate al nodo maestro del clúster de Dataproc mediante SSH.

    2. Verifica la instalación del conector Apache HBase Spark en el nodo maestro:

      ls -l /usr/lib/spark/jars | grep hbase-spark
      
      Ejemplo de salida:
      -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
      

    3. Mantén abierto el terminal de la sesión SSH para lo siguiente:

      1. Crear una tabla de HBase
      2. (Usuarios de Java): ejecuta comandos en el nodo maestro del clúster para determinar las versiones de los componentes instalados en el clúster.
      3. Escanea tu tabla de Hbase después de ejecutar el código.

    Crear una tabla de HBase

    Ejecuta los comandos que se indican en esta sección en la sesión SSH del terminal del nodo maestro que has abierto en el paso anterior.

    1. Abre el shell de HBase:

      hbase shell
      

    2. Crea una tabla de HBase llamada "my-table" con una familia de columnas "cf":

      create 'my_table','cf'
      

      1. Para confirmar la creación de la tabla, en la consola Google Cloud , haz clic en HBase en los enlaces de Component Gateway de la consolaGoogle Cloud para abrir la interfaz de usuario de Apache HBase. my-table se muestra en la sección Tablas de la página Principal.

    Ver el código de Spark

    Java

    package hbase;
    
    import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SparkHBaseMain {
        public static class SampleData implements Serializable {
            private String key;
            private String name;
    
    
            public SampleData(String key, String name) {
                this.key = key;
                this.name = name;
            }
    
            public SampleData() {
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
        }
        public static void main(String[] args) {
            // Init SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .master("yarn")
                    .appName("spark-hbase-tutorial")
                    .getOrCreate();
    
            // Data Schema
            String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                    "\"rowkey\":\"key\"," +
                    "\"columns\":{" +
                    "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                    "}" +
                    "}";
    
            Map<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);
    
            Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                    new SampleData("key1", "foo"),
                    new SampleData("key2", "bar")), SampleData.class);
    
            // Write to HBase
            ds.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .mode("overwrite")
                    .save();
    
            // Read from HBase
            Dataset dataset = spark.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .load();
            dataset.show();
        }
    }
    

    Python

    from pyspark.sql import SparkSession
    
    # Initialize Spark Session
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-hbase-tutorial') \
      .getOrCreate()
    
    data_source_format = ''
    
    # Create some test data
    df = spark.createDataFrame(
        [
            ("key1", "foo"),
            ("key2", "bar"),
        ],
        ["key", "name"]
    )
    
    # Define the schema for catalog
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"my_table"},
        "rowkey":"key",
        "columns":{
            "key":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"cf", "col":"name", "type":"string"}
        }
    }""".split())
    
    # Write to HBase
    df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()
    
    # Read from HBase
    result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
    result.show()

    Ejecutar el código

    1. Abre una terminal de sesión de Cloud Shell.

    2. Clona el repositorio de GitHub GoogleCloudDataproc/cloud-dataproc en el terminal de tu sesión de Cloud Shell:

      git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
      

    3. Cambia al directorio cloud-dataproc/spark-hbase:

      cd cloud-dataproc/spark-hbase
      
      Ejemplo de salida:
      user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
      

    4. Envía la tarea de Dataproc.

    Java

    1. Define las versiones de los componentes en el archivo pom.xml.
      1. En la página Versiones de lanzamiento de Dataproc 2.0.x se indican las versiones de los componentes Scala, Spark y HBase instaladas con las cuatro últimas versiones secundarias de la imagen 2.0 y la más reciente.
        1. Para encontrar la versión secundaria de tu clúster con la versión de imagen 2.0, haz clic en el nombre del clúster en la página Clústeres de la consolaGoogle Cloud para abrir la página Detalles del clúster, donde se muestra la Versión de imagen del clúster.
      2. También puede ejecutar los siguientes comandos en un terminal de sesión SSH desde el nodo maestro de su clúster para determinar las versiones de los componentes:
        1. Comprueba la versión de Scala:
          scala -version
          
        2. Comprueba la versión de Spark (pulsa Ctrl + D para salir):
          spark-shell
          
        3. Comprueba la versión de HBase:
          hbase version
          
        4. Identifica las dependencias de las versiones de Spark, Scala y HBase en Maven pom.xml:
          <properties>
            <scala.version>scala full version (for example, 2.12.14)</scala.version>
            <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
            <spark.version>spark version (for example, 3.1.2)</spark.version>
            <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
            <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
          </properties>
          
          Nota: hbase-spark.version es la versión actual del conector Spark HBase. No cambies este número de versión.
      3. Edita el archivo pom.xml en el editor de Cloud Shell para insertar los números de versión correctos de Scala, Spark y HBase. Cuando termines de editar, haz clic en Abrir terminal para volver a la línea de comandos del terminal de Cloud Shell.
        cloudshell edit .
        
      4. Cambia a Java 8 en Cloud Shell. Esta versión del JDK es necesaria para compilar el código (puedes ignorar los mensajes de advertencia de los complementos):
        sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
        
      5. Verifica la instalación de Java 8:
        java -version
        
        Ejemplo de salida:
        openjdk version "1.8..."
         
    2. Crea el archivo jar:
      mvn clean package
      
      El archivo .jar se coloca en el subdirectorio /target (por ejemplo, target/spark-hbase-1.0-SNAPSHOT.jar.
    3. Envía el trabajo.

      gcloud dataproc jobs submit spark \
          --class=hbase.SparkHBaseMain  \
          --jars=target/filename.jar \
          --region=cluster-region \
          --cluster=cluster-name
      
      • --jars: inserta el nombre de tu archivo .jar después de "target/" y antes de ".jar".
      • Si no has definido las rutas de clase de HBase del controlador y del ejecutor de Spark al crear el clúster, debes definirlas con cada envío de trabajo. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajo:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    4. Consulta la salida de la tabla de HBase en la salida del terminal de la sesión de Cloud Shell:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Python

    1. Envía el trabajo.

      gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
          --region=cluster-region \
          --cluster=cluster-name
      
      • Si no has definido las rutas de clase de HBase del controlador y del ejecutor de Spark al crear el clúster, debes definirlas con cada envío de trabajo. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajo:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    2. Consulta la salida de la tabla de HBase en la salida del terminal de la sesión de Cloud Shell:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Analizar la tabla de HBase

    Para analizar el contenido de tu tabla de HBase, ejecuta los siguientes comandos en el terminal de la sesión SSH del nodo maestro que has abierto en Verificar la instalación del conector:

    1. Abre el shell de HBase:
      hbase shell
      
    2. Escanear "mi-mesa":
      scan 'my_table'
      
      Ejemplo de salida:
      ROW               COLUMN+CELL
       key1             column=cf:name, timestamp=1647364013561, value=foo
       key2             column=cf:name, timestamp=1647364012817, value=bar
      2 row(s)
      Took 0.5009 seconds
      

      Limpieza

      Cuando hayas terminado el tutorial, puedes eliminar los recursos que has creado para que dejen de usar cuota y generar cargos. En las siguientes secciones se explica cómo eliminar o desactivar dichos recursos.

      Eliminar el proyecto

      La forma más fácil de evitar que te cobren es eliminar el proyecto que has creado para el tutorial.

      Para ello, sigue las instrucciones que aparecen a continuación:

      1. In the Google Cloud console, go to the Manage resources page.

        Go to Manage resources

      2. In the project list, select the project that you want to delete, and then click Delete.
      3. In the dialog, type the project ID, and then click Shut down to delete the project.

      Elimina el clúster

      • Para eliminar tu clúster, sigue estos pasos:
        gcloud dataproc clusters delete cluster-name \
            --region=${REGION}