Usar o Apache Spark com HBase no Dataproc

Objetivos

Neste tutorial, mostramos como fazer as seguintes tarefas:

  1. Criar um cluster do Dataproc, instalando o Apache HBase e o Apache ZooKeeper no cluster
  2. Criar uma tabela do HBase usando o shell do HBase em execução no nó mestre do cluster do Dataproc
  3. usar o Cloud Shell para enviar um job Java ou PySpark do Spark para o serviço do Dataproc que grava e lê dados na tabela HBase;

Custos

Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Se ainda não tiver feito, crie um projeto do Google Cloud Platform.

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como verificar se o faturamento está ativado em um projeto.

  4. Ative as APIs Dataproc and Compute Engine.

    Ative as APIs

  5. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  6. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como verificar se o faturamento está ativado em um projeto.

  7. Ative as APIs Dataproc and Compute Engine.

    Ative as APIs

Criar um cluster do Dataproc

  1. Execute o seguinte comando em um terminal da sessão do Cloud Shell para:

    • Instale os componentes de HBase e ZooKeeper.
    • Provisione três nós de trabalho (três a cinco workers são recomendados para executar o código neste tutorial)
    • Ative o Gateway de Componentes
    • Usar a versão de imagem 2.0
    • Use a sinalização --properties para adicionar a configuração HBase e a biblioteca HBase para os caminhos de classe do driver e do executor do Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Verificar a instalação do conector

  1. No Console do Cloud ou em um terminal de sessão do Cloud Shell, SSH no nó mestre do cluster do Dataproc.

  2. Verifique a instalação do conector do Apache HBase Spark no nó mestre:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Exemplo de resposta:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Mantenha o terminal da sessão SSH aberto para:

    1. Criar uma tabela do HBase
    2. (usuários de Java): execute comandos no nó mestre do cluster para determinar as versões dos componentes instalados no cluster;
    3. Verifique a tabela do HBase depois de executar o código.

Crie uma tabela do HBase

Execute os comandos listados nesta seção no terminal da sessão SSH do nó mestre que você abriu na etapa anterior.

  1. Abra o shell do HBase:

    hbase shell
    

  2. Crie um HBase 'my-table' com um grupo de colunas 'cf'

    create 'my_table','cf'
    

    1. Para confirmar a criação da tabela, no Console do Cloud, clique em HBase nos links de gateway de componente do Console do Cloud para abrir a IU do Apache HBase. my-table está listado na seção Tabelas, na página Início.

Ver o código do Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Executar o código

  1. Abra um terminal da sessão do Cloud Shell.

  2. Clone o repositório GoogleCloudDataproc/cloud-dataproc do GitHub no terminal da sessão do Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Altere para o diretório cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Exemplo de resposta:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Envie o job do Dataproc.

Java

  1. Defina as versões dos componentes no arquivo pom.xml.
    1. A página Versões de lançamento 2.0.x do Dataproc lista as versões dos componentes Scala, Spark e HBase instaladas com as quatro versões mais recentes e últimos subsecundárias de imagem 2.0.
      1. Para encontrar a versão subsecundária do cluster de versão de imagem 2.0, clique no nome do cluster na página Clusters no Console do Google Cloud para abrir a página Detalhes do cluster, em que a Versão de imagem do cluster está listada.
    2. Como alternativa, é possível executar os seguintes comandos em um terminal de sessão SSH a partir do nó mestre do cluster para determinar as versões dos componentes:
      1. Verifique a versão do scala:
        scala -version
        
      2. Verifique a versão do Spark (control-D para sair):
        spark-shell
        
      3. Verifique a versão do HBase:
        hbase version
        
      4. Identifique as dependências de versão do Spark, Scala e HBase no Maven pom.xml:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Observação: o hbase-spark.version é a versão atual do conector HBase do Spark. Não altere o número da versão.
    3. Edite o arquivo pom.xml no editor do Cloud Shell para inserir os números de versão corretos do Scala, do Spark e do HBase. Clique em Abrir terminal quando terminar de editar para retornar à linha de comando do terminal do Cloud Shell.
      cloudshell edit .
      
    4. Mude para Java 8 no Cloud Shell. Essa versão do JDK é necessária para criar o código. Ignore as mensagens de aviso do plug-in:
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifique a instalação do Java 8:
      java -version
      
      Exemplo de resposta:
      openjdk version "1.8..."
       
  2. Crie o arquivo jar:
    mvn clean package
    
    O arquivo .jar é colocado no subdiretório /target (por exemplo, target/spark-hbase-1.0-SNAPSHOT.jar).
  3. Envie o job.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: insira o nome do arquivo .jar depois de "target/" e antes de ".jar".
    • Se você não definiu os caminhos de classe do HBase e do driver do Spark ao criar o cluster, defina-os com cada envio de job incluindo a seguinte sinalização ‑‑properties no comando de envio do job:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Veja a saída da tabela HBase na saída do terminal da sessão do Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Envie o job.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Se você não definiu os caminhos de classe do HBase e do driver do Spark ao criar o cluster, defina-os com cada envio de job incluindo a seguinte sinalização ‑‑properties no comando de envio do job:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Veja a saída da tabela HBase na saída do terminal da sessão do Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Verificar a tabela HBase

É possível verificar o conteúdo da tabela do HBase executando os seguintes comandos no terminal da sessão SSH do nó mestre que você abriu em Verificar a instalação do conector:

  1. Abra o shell do HBase:
    hbase shell
    
  2. Verificar 'my-table&#39:
    scan 'my_table'
    
    Exemplo de resposta:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Limpar

Depois de concluir o tutorial, você pode limpar os recursos que criou para que eles parem de usar a cota e gerar cobranças. Nas seções a seguir, você aprenderá a excluir e desativar esses recursos.

Excluir o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

Para excluir o projeto:

  1. No console do Cloud, acesse a página Gerenciar recursos:

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir o cluster

  • Para excluir o cluster, siga estas etapas:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}