Apache Spark mit HBase auf Dataproc verwenden


Lernziele

In dieser Anleitung wird Folgendes beschrieben:

  1. Dataproc-Cluster erstellen und Apache HBase und Apache ZooKeeper auf dem Cluster installieren
  2. Eine HBase-Tabelle mithilfe der HBase-Shell erstellen, die auf dem Master-Knoten des Dataproc-Clusters ausgeführt wird
  3. Senden Sie mit Cloud Shell einen Java- oder PySpark Spark-Job an den Dataproc-Dienst, der Daten in die HBase-Tabelle schreibt und dann liest.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweise

Erstellen Sie ein Google Cloud Platform-Projekt, falls dies noch nicht geschehen ist.

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  4. Dataproc and Compute Engine APIs aktivieren.

    Aktivieren Sie die APIs

  5. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  6. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  7. Dataproc and Compute Engine APIs aktivieren.

    Aktivieren Sie die APIs

Dataproc-Cluster erstellen

  1. Führen Sie den folgenden Befehl in einem Cloud Shell-Sitzungsterminal aus, um:

    • HBase und ZooKeeper-Komponenten installieren
    • Stellen Sie drei Worker-Knoten bereit (zum Ausführen des Codes in dieser Anleitung werden drei bis fünf Worker empfohlen)
    • Component Gateway aktivieren
    • Image-Version 2.0 verwenden
    • Verwenden Sie das Flag --properties, um die HBase-Konfiguration und die HBase-Bibliothek den Spark-Treibern und den Executor-Klassenpfaden hinzuzufügen.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Connector-Installation überprüfen

  1. Stellen Sie über die Google Cloud Console oder ein Cloud Shell-Sitzungsterminal eine SSH-Verbindung zum Masterknoten des Dataproc-Clusters her.

  2. Überprüfen Sie die Installation des Apache HBase Spark-Connector auf dem Master-Knoten:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Beispielausgabe:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Lassen Sie das SSH-Sitzungsterminal geöffnet für:

    1. HBase-Tabelle erstellen
    2. (Java-Nutzer): Führen Sie Befehle auf dem Masterknoten des Clusters aus, um die Versionen der im Cluster installierten Komponenten zu ermitteln.
    3. Scannen Sie Ihre Hbase-Tabelle, nachdem Sie den Code ausgeführt haben

HBase-Tabelle erstellen

Führen Sie die in diesem Abschnitt aufgeführten Befehle im SSH-Sitzungsterminal des Master-Knotens aus, das Sie im vorherigen Schritt geöffnet haben.

  1. Öffnen Sie die HBase-Shell:

    hbase shell
    

  2. Erstellen Sie eine HBase 'my-table' mit einer 'cf'-Spaltenfamilie:

    create 'my_table','cf'
    

    1. Klicken Sie in der Google Cloud Console in den Component Gateway-Links der Google Cloud Console auf HBase, um die Apache HBase-UI zu öffnen und die Tabellenerstellung zu bestätigen. my-table ist auf der Startseite im Abschnitt Tabellen aufgeführt.

Spark-Code ansehen

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Code ausführen

  1. Öffnen Sie ein Cloud Shell-Sitzungsterminal.

  2. Klonen Sie das GitHub-Repository GoogleCloudDataproc/cloud-dataproc in Ihr Cloud Shell-Sitzungsterminal:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Wechseln Sie in das Verzeichnis cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Beispielausgabe:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Senden Sie den Dataproc-Job.

Java

  1. Legen Sie die Komponentenversionen in der Datei pom.xml fest.
    1. Auf der Seite Release-Versionen von Dataproc 2.0.x von Dataproc werden die Scala-, Spark- und HBase-Komponentenversionen aufgelistet, die mit den neuesten und letzten vier Image 2.0-Sub-Minor-Versionen installiert wurden.
      1. Wenn Sie die Sub-Minor-Version des Clusters der Image-Version 2.0 ermitteln möchten, klicken Sie in der Google Cloud Console auf der Seite Cluster auf den Namen des Clusters. Daraufhin wird die Seite Clusterdetails geöffnet, auf der die Image-Version des Clusters aufgeführt ist.
    2. Alternativ können Sie die folgenden Befehle in einem SSH-Sitzungsterminal vom Masterknoten des Clusters ausführen, um die Komponentenversionen zu ermitteln:
      1. Scala-Version prüfen:
        scala -version
        
      2. Prüfen Sie die Spark-Version (Strg + D zum Beenden):
        spark-shell
        
      3. HBase-Version prüfen:
        hbase version
        
      4. Ermitteln Sie die Abhängigkeiten der Spark-, Scala- und HBase-Version im Maven-pom.xml:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Hinweis: hbase-spark.version ist die aktuelle Version des Spark HBase-Connectors. Lassen Sie diese Versionsnummer unverändert.
    3. Bearbeiten Sie die Datei pom.xml im Cloud Shell-Editor, um die richtige Scala-, Spark- und HBase-Versionsnummer einzufügen. Klicken Sie nach der Bearbeitung auf Terminal öffnen, um zur Befehlszeile des Cloud Shell-Terminals zurückzukehren.
      cloudshell edit .
      
    4. Wechseln Sie in Cloud Shell zu Java 8. Diese JDK-Version wird zum Erstellen des Codes benötigt. Sie können alle Warnmeldungen des Plug-ins ignorieren:
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Java 8-Installation prüfen:
      java -version
      
      Beispielausgabe:
      openjdk version "1.8..."
       
  2. Erstellen Sie die Datei jar:
    mvn clean package
    
    Die Datei .jar wird im Unterverzeichnis /target gespeichert, z. B. target/spark-hbase-1.0-SNAPSHOT.jar.
  3. Senden Sie den Job.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: Geben Sie den Namen der .jar-Datei nach „target/“ und vor „.jar“ ein.
    • Wenn Sie die HBase-Klassenpfade des Spark-Treibers und des Executors nicht beim Erstellen des Clusters festgelegt haben, müssen Sie sie bei jeder Jobübermittlung festlegen. Fügen Sie dazu das folgende Flag ‑‑properties in den Befehl zum Senden des Jobs ein:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Sehen Sie sich die Ausgabe der HBase-Tabelle in der Ausgabe des Cloud Shell-Sitzungsterminals an:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Senden Sie den Job.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Wenn Sie die HBase-Klassenpfade des Spark-Treibers und des Executors nicht beim Erstellen des Clusters festgelegt haben, müssen Sie sie bei jeder Jobübermittlung festlegen. Fügen Sie dazu das folgende Flag ‑‑properties in den Befehl zum Senden des Jobs ein:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Sehen Sie sich die Ausgabe der HBase-Tabelle in der Ausgabe des Cloud Shell-Sitzungsterminals an:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

HBase-Tabelle scannen

Sie können den Inhalt Ihrer HBase-Tabelle scannen, indem Sie die folgenden Befehle im SSH-Sitzungsterminal des Master-Knotens ausführen, das Sie unter Connector-Installation überprüfen geöffnet haben:

  1. Öffnen Sie die HBase-Shell:
    hbase shell
    
  2. Durchsuchen Sie 'my-table':
    scan 'my_table'
    
    Beispielausgabe:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Bereinigen

Nachdem Sie die Anleitung abgeschlossen haben, können Sie die erstellten Ressourcen bereinigen, damit sie keine Kontingente mehr nutzen und keine Gebühren mehr anfallen. In den folgenden Abschnitten erfahren Sie, wie Sie diese Ressourcen löschen oder deaktivieren.

Projekt löschen

Am einfachsten vermeiden Sie weitere Kosten durch Löschen des für die Anleitung erstellten Projekts.

So löschen Sie das Projekt:

  1. Wechseln Sie in der Google Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Cluster löschen

  • So löschen Sie den Cluster:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}