Usar o Apache Spark com o HBase no Dataproc


Objetivos

Neste tutorial, mostramos como fazer as seguintes tarefas:

  1. Criar um cluster do Dataproc, instalando o Apache HBase e o Apache ZooKeeper.
  2. Crie uma tabela do HBase usando o shell do HBase em execução no nó mestre do cluster do Dataproc.
  3. Usar o Cloud Shell para enviar um job do Spark em Java ou PySpark ao serviço do Dataproc que grava dados e, em seguida, lê dados da tabela do HBase.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Se ainda não tiver feito, crie um projeto do Google Cloud Platform.

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  4. Ative as APIs Dataproc and Compute Engine.

    Ative as APIs

  5. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  6. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  7. Ative as APIs Dataproc and Compute Engine.

    Ative as APIs

Criar um cluster do Dataproc

  1. Execute o comando a seguir em um terminal de sessão do Cloud Shell para:

    • Instale os componentes HBase e ZooKeeper (links em inglês)
    • Provisione três nós de trabalho. Recomendamos de três a cinco workers para executar o código neste tutorial.
    • Ative o Gateway de componentes
    • Usar a versão 2.0 da imagem
    • Use a sinalização --properties para adicionar a configuração e a biblioteca do HBase aos caminhos de classe do driver e do executor do Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verifique a instalação do conector

  1. No console do Google Cloud ou em um terminal de sessão do Cloud Shell, SSH no nó mestre do cluster do Dataproc.

  2. Verifique a instalação do conector do Apache HBase Spark no nó mestre:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Exemplo de resposta:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Mantenha o terminal da sessão SSH aberto para:

    1. Crie uma tabela do HBase
    2. (Usuários do Java): execute comandos no nó mestre do cluster para determinar as versões dos componentes instalados nele.
    3. Verifique a tabela Hbase depois de executar o código.

Criar uma tabela do HBase

Execute os comandos listados nesta seção no terminal de sessão SSH do nó mestre que você abriu na etapa anterior.

  1. Abra o shell do HBase:

    hbase shell
    

  2. Crie uma "my-table" do HBase com um grupo de colunas "cf":

    create 'my_table','cf'
    

    1. Para confirmar a criação da tabela, no console do Google Cloud, clique em HBase nos links do Gateway de componentes do console do Google Cloud para abrir a interface do Apache HBase. my-table está listado na seção Tabelas na Página inicial.

Acessar o código Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Executar o código

  1. Abra um terminal de sessão do Cloud Shell.

  2. Clone o repositório do GitHub GoogleCloudDataproc/cloud-dataproc no terminal de sessão do Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Altere para o diretório cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Exemplo de resposta:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Envie o job do Dataproc.

Java

  1. Defina as versões dos componentes no arquivo pom.xml.
    1. A página Versões de lançamento 2.0.x do Dataproc lista as versões dos componentes Scala, Spark e HBase instaladas com as últimas quatro versões subsecundárias da imagem 2.0 e a mais recente.
      1. Para encontrar a versão subsecundária do cluster da versão de imagem 2.0, clique no nome do cluster na página Clusters do Console do Google Cloud para abrir a página Detalhes do cluster, em que a Versão da imagem do cluster está listada.
    2. Como alternativa, é possível executar os seguintes comandos em um terminal de sessão SSH no nó mestre do cluster para determinar as versões dos componentes:
      1. Verifique a versão de escala:
        scala -version
        
      2. Verifique a versão do Spark (Control+D para sair):
        spark-shell
        
      3. Verifique a versão do HBase:
        hbase version
        
      4. Identifique as dependências de versão do Spark, do Scala e do HBase no pom.xml do Maven:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Observação: o hbase-spark.version é a versão atual do conector HBase do Spark. Não altere esse número.
    3. Edite o arquivo pom.xml no editor do Cloud Shell para inserir os números de versão corretos do Scala, do Spark e do HBase. Clique em Abrir terminal quando terminar de editar para retornar à linha de comando do terminal do Cloud Shell.
      cloudshell edit .
      
    4. Mude para o Java 8 no Cloud Shell. Essa versão do JDK é necessária para criar o código. Ignore qualquer mensagem de aviso do plug-in:
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifique a instalação do Java 8:
      java -version
      
      Exemplo de resposta:
      openjdk version "1.8..."
       
  2. Crie o arquivo jar:
    mvn clean package
    
    O arquivo .jar é colocado no subdiretório /target (por exemplo, target/spark-hbase-1.0-SNAPSHOT.jar.
  3. Envie o job.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: insira o nome do arquivo .jar depois de "target/" e antes de ".jar".
    • Se você não tiver definido o driver do Spark e os caminhos de classe do HBase do executor ao criar o cluster, precisará defini-los a cada envio de job. Para isso, inclua a seguinte sinalização ‑‑properties no comando de envio do job:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Veja a saída da tabela do HBase na saída do terminal da sessão do Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Envie o job.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Se você não tiver definido o driver do Spark e os caminhos de classe do HBase do executor ao criar o cluster, precisará defini-los a cada envio de job. Para isso, inclua a seguinte sinalização ‑‑properties no comando de envio do job:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Veja a saída da tabela do HBase na saída do terminal da sessão do Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Verificar a tabela do HBase

Verifique o conteúdo da tabela do HBase executando os seguintes comandos no terminal de sessão SSH do nó mestre que você abriu em Verificar a instalação do conector:

  1. Abra o shell do HBase:
    hbase shell
    
  2. Verificar "my-table":
    scan 'my_table'
    
    Exemplo de resposta:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Limpar

Depois de concluir o tutorial, você pode limpar os recursos que criou para que eles parem de usar a cota e gerar cobranças. Nas seções a seguir, você aprenderá a excluir e desativar esses recursos.

Exclua o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

Para excluir o projeto, faça o seguinte:

  1. No Console do Google Cloud, acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir o cluster

  • Para excluir o cluster:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}