Usar o Apache Spark com o HBase no Dataproc

Objetivos

Neste tutorial, mostramos como fazer as seguintes tarefas:

  1. Crie um cluster do Dataproc, instalando o Apache HBase e o Apache ZooKeeper nele.
  2. Crie uma tabela do HBase usando o shell do HBase em execução no nó principal do cluster do Dataproc.
  3. Use o Cloud Shell para enviar um job do Spark em Java ou PySpark ao serviço Dataproc, que grava e lê dados da tabela do HBase.

Custos

Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na sua projeção de uso, use a calculadora de preços.

Novos usuários do Google Cloud podem estar qualificados para um teste gratuito.

Antes de começar

Se ainda não tiver feito, crie um projeto do Google Cloud Platform.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Criar um cluster do Dataproc

    1. Execute o comando a seguir em um terminal de sessão do Cloud Shell para:

      • Instale os componentes HBase e ZooKeeper.
      • Provisione três nós de trabalho (recomenda-se de três a cinco workers para executar o código neste tutorial).
      • Ative o Gateway de componentes.
      • Usar a versão de imagem 2.0
      • Use a flag --properties para adicionar a configuração e a biblioteca do HBase aos caminhos de classe do driver e do executor do Spark.
    gcloud dataproc clusters create cluster-name \
        --region=region \
        --optional-components=HBASE,ZOOKEEPER \
        --num-workers=3 \
        --enable-component-gateway \
        --image-version=2.0 \
        --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
    

    Verificar a instalação do conector

    1. No console Google Cloud ou em um terminal de sessão do Cloud Shell, execute o SSH no nó mestre do cluster do Dataproc.

    2. Verifique a instalação do conector do Apache HBase Spark no nó mestre:

      ls -l /usr/lib/spark/jars | grep hbase-spark
      
      Exemplo de saída:
      -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
      

    3. Mantenha o terminal da sessão SSH aberto para:

      1. Criar uma tabela do HBase
      2. (Usuários do Java): execute comandos no nó mestre do cluster para determinar as versões dos componentes instalados no cluster.
      3. Faça a leitura da sua tabela do HBase depois de executar o código.

    Criar uma tabela do HBase

    Execute os comandos listados nesta seção no terminal da sessão SSH do nó mestre que você abriu na etapa anterior.

    1. Abra o shell do HBase:

      hbase shell
      

    2. Crie uma tabela "my-table" do HBase com um grupo de colunas "cf":

      create 'my_table','cf'
      

      1. Para confirmar a criação da tabela, no console do Google Cloud , clique em HBase nos links do Component Gateway do console doGoogle Cloud para abrir a interface do Apache HBase. my-table é listado na seção Tabelas da página Início.

    Ver o código do Spark

    Java

    package hbase;
    
    import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SparkHBaseMain {
        public static class SampleData implements Serializable {
            private String key;
            private String name;
    
    
            public SampleData(String key, String name) {
                this.key = key;
                this.name = name;
            }
    
            public SampleData() {
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
        }
        public static void main(String[] args) {
            // Init SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .master("yarn")
                    .appName("spark-hbase-tutorial")
                    .getOrCreate();
    
            // Data Schema
            String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                    "\"rowkey\":\"key\"," +
                    "\"columns\":{" +
                    "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                    "}" +
                    "}";
    
            Map<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);
    
            Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                    new SampleData("key1", "foo"),
                    new SampleData("key2", "bar")), SampleData.class);
    
            // Write to HBase
            ds.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .mode("overwrite")
                    .save();
    
            // Read from HBase
            Dataset dataset = spark.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .load();
            dataset.show();
        }
    }
    

    Python

    from pyspark.sql import SparkSession
    
    # Initialize Spark Session
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-hbase-tutorial') \
      .getOrCreate()
    
    data_source_format = ''
    
    # Create some test data
    df = spark.createDataFrame(
        [
            ("key1", "foo"),
            ("key2", "bar"),
        ],
        ["key", "name"]
    )
    
    # Define the schema for catalog
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"my_table"},
        "rowkey":"key",
        "columns":{
            "key":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"cf", "col":"name", "type":"string"}
        }
    }""".split())
    
    # Write to HBase
    df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()
    
    # Read from HBase
    result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
    result.show()

    Executar o código

    1. Abra um terminal de sessão do Cloud Shell.

    2. Clone o repositório GoogleCloudDataproc/cloud-dataproc do GitHub no terminal da sessão do Cloud Shell:

      git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
      

    3. Altere para o diretório cloud-dataproc/spark-hbase:

      cd cloud-dataproc/spark-hbase
      
      Exemplo de saída:
      user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
      

    4. Envie o job do Dataproc.

    Java

    1. Defina as versões dos componentes no arquivo pom.xml.
      1. A página versões de lançamento do Dataproc 2.0.x lista as versões dos componentes Scala, Spark e HBase instaladas com as quatro últimas versões secundárias da imagem 2.0 e a mais recente.
        1. Para encontrar a versão subalterna do cluster de versão de imagem 2.0, clique no nome do cluster na página Clusters noGoogle Cloud console para abrir a página Detalhes do cluster, em que a Versão da imagem do cluster está listada.
      2. Como alternativa, execute os seguintes comandos em um terminal de sessão SSH do nó mestre do cluster para determinar as versões dos componentes:
        1. Verifique a versão do Scala:
          scala -version
          
        2. Verifique a versão do Spark (pressione Control+D para sair):
          spark-shell
          
        3. Verifique a versão do HBase:
          hbase version
          
        4. Identifique as dependências de versão do Spark, Scala e HBase no Maven pom.xml:
          <properties>
            <scala.version>scala full version (for example, 2.12.14)</scala.version>
            <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
            <spark.version>spark version (for example, 3.1.2)</spark.version>
            <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
            <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
          </properties>
          
          Observação: hbase-spark.version é a versão atual do conector do Spark HBase. Deixe esse número de versão inalterado.
      3. Edite o arquivo pom.xml no editor do Cloud Shell para inserir os números de versão corretos do Scala, do Spark e do HBase. Clique em Abrir terminal quando terminar de editar para voltar à linha de comando do terminal do Cloud Shell.
        cloudshell edit .
        
      4. Mude para o Java 8 no Cloud Shell. Essa versão do JDK é necessária para criar o código. Ignore as mensagens de aviso do plug-in:
        sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
        
      5. Verifique a instalação do Java 8:
        java -version
        
        Exemplo de saída:
        openjdk version "1.8..."
         
    2. Crie o arquivo jar:
      mvn clean package
      
      O arquivo .jar é colocado no subdiretório /target (por exemplo, target/spark-hbase-1.0-SNAPSHOT.jar.
    3. Envie o job.

      gcloud dataproc jobs submit spark \
          --class=hbase.SparkHBaseMain  \
          --jars=target/filename.jar \
          --region=cluster-region \
          --cluster=cluster-name
      
      • --jars: insira o nome do arquivo .jar depois de "target/" e antes de ".jar".
      • Se você não definiu os caminhos de classe do HBase do driver e do executor do Spark ao criar o cluster, defina-os com cada envio de job incluindo a seguinte flag ‑‑properties no comando de envio:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    4. Confira a saída da tabela do HBase na saída do terminal da sessão do Cloud Shell:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Python

    1. Envie o job.

      gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
          --region=cluster-region \
          --cluster=cluster-name
      
      • Se você não definiu os caminhos de classe do HBase do driver e do executor do Spark ao criar o cluster, defina-os com cada envio de job incluindo a seguinte flag ‑‑properties no comando de envio:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    2. Confira a saída da tabela do HBase na saída do terminal da sessão do Cloud Shell:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Verificar a tabela do HBase

    Para verificar o conteúdo da sua tabela do HBase, execute os seguintes comandos no terminal da sessão SSH do nó principal que você abriu em Verificar a instalação do conector:

    1. Abra o shell do HBase:
      hbase shell
      
    2. Verificar "my-table":
      scan 'my_table'
      
      Exemplo de saída:
      ROW               COLUMN+CELL
       key1             column=cf:name, timestamp=1647364013561, value=foo
       key2             column=cf:name, timestamp=1647364012817, value=bar
      2 row(s)
      Took 0.5009 seconds
      

      Limpar

      Depois de concluir o tutorial, você pode limpar os recursos que criou para que eles parem de usar a cota e gerar cobranças. Nas seções a seguir, você aprenderá a excluir e desativar esses recursos.

      Exclua o projeto

      O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

      Para excluir o projeto:

      1. In the Google Cloud console, go to the Manage resources page.

        Go to Manage resources

      2. In the project list, select the project that you want to delete, and then click Delete.
      3. In the dialog, type the project ID, and then click Shut down to delete the project.

      excluir o cluster

      • Para excluir o cluster:
        gcloud dataproc clusters delete cluster-name \
            --region=${REGION}