Usa Apache Spark con HBase en Dataproc

Objetivos

En este instructivo, se muestra cómo hacer lo siguiente:

  1. Crea un clúster de Dataproc y, luego, instala Apache HBase y Apache ZooKeeper en él.
  2. Crea una tabla de HBase con la shell de HBase que se ejecuta en el nodo principal del clúster de Dataproc
  3. Usa Cloud Shell para enviar un trabajo de Spark en Java o PySpark al servicio de Dataproc que escribe datos en la tabla de HBase y, luego, lee datos de ella.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.

Es posible que los usuarios de Google Cloud nuevos cumplan con los requisitos para acceder a una prueba gratuita.

Antes de comenzar

Si aún no lo hiciste, crea un proyecto de Google Cloud Platform.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  8. Crea un clúster de Dataproc

    1. Ejecuta el siguiente comando en una terminal de sesión de Cloud Shell para hacer lo siguiente:

      • Instala los componentes HBase y ZooKeeper.
      • Aprovisiona tres nodos trabajadores (se recomienda usar entre tres y cinco trabajadores para ejecutar el código en este instructivo).
      • Habilita la puerta de enlace de componentes.
      • Usar la versión de imagen 2.0
      • Usa la marca --properties para agregar la configuración y la biblioteca de HBase a las rutas de acceso de clase del controlador y el ejecutor de Spark.
    gcloud dataproc clusters create cluster-name \
        --region=region \
        --optional-components=HBASE,ZOOKEEPER \
        --num-workers=3 \
        --enable-component-gateway \
        --image-version=2.0 \
        --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
    

    Verifica la instalación del conector

    1. Desde la Google Cloud consola o una terminal de sesión de Cloud Shell, establece una conexión SSH al instancia principal del clúster de Dataproc.

    2. Verifica la instalación del conector de Apache HBase Spark en el nodo principal:

      ls -l /usr/lib/spark/jars | grep hbase-spark
      
      Resultado de muestra:
      -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
      

    3. Mantén abierta la terminal de la sesión SSH para hacer lo siguiente:

      1. Crea una tabla de HBase
      2. (Usuarios de Java): Ejecuta comandos en el nodo principal del clúster para determinar las versiones de los componentes instalados en el clúster.
      3. Analiza tu tabla de HBase después de ejecutar el código.

    Crea una tabla de HBase

    Ejecuta los comandos que se indican en esta sección en la terminal de la sesión de SSH del nodo principal que abriste en el paso anterior.

    1. Abre la shell de HBase:

      hbase shell
      

    2. Crea una tabla "my-table" de HBase con una familia de columnas "cf":

      create 'my_table','cf'
      

      1. Para confirmar la creación de la tabla, en la Google Cloud consola, haz clic en HBase en los vínculos de la puerta de enlace de componentes de la consolaGoogle Cloud para abrir la IU de Apache HBase. my-table aparece en la sección Tablas de la página Página principal.

    Cómo ver el código de Spark

    Java

    package hbase;
    
    import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SparkHBaseMain {
        public static class SampleData implements Serializable {
            private String key;
            private String name;
    
    
            public SampleData(String key, String name) {
                this.key = key;
                this.name = name;
            }
    
            public SampleData() {
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
        }
        public static void main(String[] args) {
            // Init SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .master("yarn")
                    .appName("spark-hbase-tutorial")
                    .getOrCreate();
    
            // Data Schema
            String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                    "\"rowkey\":\"key\"," +
                    "\"columns\":{" +
                    "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                    "}" +
                    "}";
    
            Map<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);
    
            Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                    new SampleData("key1", "foo"),
                    new SampleData("key2", "bar")), SampleData.class);
    
            // Write to HBase
            ds.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .mode("overwrite")
                    .save();
    
            // Read from HBase
            Dataset dataset = spark.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .load();
            dataset.show();
        }
    }
    

    Python

    from pyspark.sql import SparkSession
    
    # Initialize Spark Session
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-hbase-tutorial') \
      .getOrCreate()
    
    data_source_format = ''
    
    # Create some test data
    df = spark.createDataFrame(
        [
            ("key1", "foo"),
            ("key2", "bar"),
        ],
        ["key", "name"]
    )
    
    # Define the schema for catalog
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"my_table"},
        "rowkey":"key",
        "columns":{
            "key":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"cf", "col":"name", "type":"string"}
        }
    }""".split())
    
    # Write to HBase
    df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()
    
    # Read from HBase
    result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
    result.show()

    Ejecuta el código

    1. Abre una terminal de sesión de Cloud Shell.

    2. Clona el repositorio de GitHub GoogleCloudDataproc/cloud-dataproc en la terminal de tu sesión de Cloud Shell:

      git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
      

    3. Cambia al directorio cloud-dataproc/spark-hbase:

      cd cloud-dataproc/spark-hbase
      
      Resultado de muestra:
      user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
      

    4. Envía el trabajo de Dataproc.

    Java

    1. Configura las versiones de los componentes en el archivo pom.xml.
      1. En la página de versiones de actualización 2.0.x de Dataproc, se enumeran las versiones de los componentes de Scala, Spark y HBase instalados con las cuatro últimas versiones secundarias de la imagen 2.0 y la más reciente.
        1. Para encontrar la versión secundaria de tu clúster de versión de imagen 2.0, haz clic en el nombre del clúster en la página Clústeres de la consola deGoogle Cloud para abrir la página Detalles del clúster, en la que se muestra la Versión de imagen del clúster.
      2. Como alternativa, puedes ejecutar los siguientes comandos en una terminal de sesión SSH desde el nodo principal de tu clúster para determinar las versiones de los componentes:
        1. Verifica la versión de Scala:
          scala -version
          
        2. Verifica la versión de Spark (Ctrl+D para salir):
          spark-shell
          
        3. Verifica la versión de HBase:
          hbase version
          
        4. Identifica las dependencias de versión de Spark, Scala y HBase en pom.xml de Maven:
          <properties>
            <scala.version>scala full version (for example, 2.12.14)</scala.version>
            <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
            <spark.version>spark version (for example, 3.1.2)</spark.version>
            <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
            <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
          </properties>
          
          Nota: hbase-spark.version es la versión actual del conector de HBase de Spark. No cambies este número de versión.
      3. Edita el archivo pom.xml en el editor de Cloud Shell para insertar los números de versión correctos de Scala, Spark y HBase. Haz clic en Abrir terminal cuando termines de editar para volver a la línea de comandos de la terminal de Cloud Shell.
        cloudshell edit .
        
      4. Cambia a Java 8 en Cloud Shell. Esta versión del JDK es necesaria para compilar el código (puedes ignorar cualquier mensaje de advertencia del complemento):
        sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
        
      5. Verifica la instalación de Java 8:
        java -version
        
        Resultado de muestra:
        openjdk version "1.8..."
         
    2. Compila el archivo jar:
      mvn clean package
      
      El archivo .jar se coloca en el subdirectorio /target (por ejemplo, target/spark-hbase-1.0-SNAPSHOT.jar).
    3. Envía el trabajo.

      gcloud dataproc jobs submit spark \
          --class=hbase.SparkHBaseMain  \
          --jars=target/filename.jar \
          --region=cluster-region \
          --cluster=cluster-name
      
      • --jars: Inserta el nombre de tu archivo .jar después de "target/" y antes de ".jar".
      • Si no estableciste las rutas de acceso de clase de HBase del controlador y el ejecutor de Spark cuando creaste tu clúster, debes establecerlas con cada envío de trabajo. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajo:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    4. Consulta el resultado de la tabla de HBase en el resultado de la terminal de la sesión de Cloud Shell:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Python

    1. Envía el trabajo.

      gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
          --region=cluster-region \
          --cluster=cluster-name
      
      • Si no estableciste las rutas de acceso de clase de HBase del controlador y el ejecutor de Spark cuando creaste tu clúster, debes establecerlas con cada envío de trabajo. Para ello, incluye la siguiente marca ‑‑properties en el comando de envío de trabajo:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    2. Consulta el resultado de la tabla de HBase en el resultado de la terminal de la sesión de Cloud Shell:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Analiza la tabla de HBase

    Para analizar el contenido de tu tabla de HBase, ejecuta los siguientes comandos en la terminal de la sesión de SSH del nodo principal que abriste en Verifica la instalación del conector:

    1. Abre la shell de HBase:
      hbase shell
      
    2. Analiza "my-table":
      scan 'my_table'
      
      Resultado de muestra:
      ROW               COLUMN+CELL
       key1             column=cf:name, timestamp=1647364013561, value=foo
       key2             column=cf:name, timestamp=1647364012817, value=bar
      2 row(s)
      Took 0.5009 seconds
      

      Limpia

      Una vez que completes el instructivo, puedes limpiar los recursos que creaste para que dejen de usar la cuota y generar cargos. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.

      Borra el proyecto

      La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

      Para borrar el proyecto, sigue estos pasos:

      1. In the Google Cloud console, go to the Manage resources page.

        Go to Manage resources

      2. In the project list, select the project that you want to delete, and then click Delete.
      3. In the dialog, type the project ID, and then click Shut down to delete the project.

      Borra el clúster

      • Para borrar tu clúster, realiza los siguientes pasos:
        gcloud dataproc clusters delete cluster-name \
            --region=${REGION}