BigQuery-Connector mit Spark verwenden

Der spark-bigquery-connector wird mit Apache Spark verwendet, um Daten aus BigQuery zu lesen und zu schreiben. Diese Anleitung enthält Beispielcode, der den spark-bigquery-connector in einer Spark-Anwendung verwendet. Eine Anleitung zum Erstellen eines Clusters finden Sie in den Dataproc-Kurzanleitungen.

Connector für Ihre Anwendung verfügbar machen

Sie haben folgende Möglichkeiten, den Spark-BigQuery-Connector Ihrer Anwendung zur Verfügung zu stellen:

  1. Installieren Sie den spark-bigquery-Connector im Verzeichnis der Spark-JAR-Dateien jedes Knotens. Verwenden Sie dazu die Initialisierungsaktion für Dataproc-Connectors, wenn Sie Ihren Cluster erstellen.

  2. Geben Sie den Connector-URI beim Senden des Jobs an:

    1. Google Cloud Console: Verwenden Sie das Element Jars files des Spark-Jobs auf der Dataproc-Seite Job senden.
    2. gcloud-Befehlszeile: Verwenden Sie das Flag gcloud dataproc jobs submit spark --jars.
    3. Dataproc API:Verwenden Sie das Feld SparkJob.jarFileUris.
  3. Fügen Sie die JAR-Datei in die Scala- oder Java Spark-Anwendung als Abhängigkeit ein (siehe Kompilieren gegen den Connector).

So legen Sie den URI der Connector-JAR-Datei fest

Versionen von Spark-BigQuery-Connectors werden im GitHub-Repository GoogleCloud Dataproc/spark-bigquery-connector aufgeführt.

Geben Sie die Connector-JAR-Datei an, indem Sie die Scala- und Connector-Versionsinformationen im folgenden URI-String ersetzen:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Scala 2.12 mit Dataproc-Image-Versionen 1.5+ verwenden

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    Beispiel für die gcloud-Befehlszeile:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Verwenden Sie die Scala 2.11 mit den Dataproc-Image-Versionen 1.4 und früher:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    Beispiel für die gcloud-Befehlszeile:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

Kosten berechnen

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Lese-/Schreibvorgänge in BigQuery

In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.

Der Connector schreibt die Daten in BigQuery, indem sie alle Daten zuerst in einer temporären Cloud Storage-Tabelle zwischenspeichern. Anschließend werden alle Daten in einem Vorgang in BigQuery kopiert. Sobald der BigQuery-Ladevorgang erfolgreich war, versucht der Connector, die temporären Dateien zu löschen. Entfernen Sie alle verbleibenden temporären Cloud Storage-Dateien, wenn der Job fehlschlägt. In der Regel befinden sich temporäre BigQuery-Dateien in gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Abrechnung konfigurieren

Standardmäßig wird das Projekt, das den Anmeldedaten oder dem Dienstkonto zugeordnet ist, nach API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Sie kann auch folgendermaßen einem Lese-/Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>").

Code ausführen

Bevor Sie dieses Beispiel ausführen, erstellen Sie ein Dataset mit dem Namen "wordcount_dataset" oder ändern Sie das Ausgabe-Dataset im Code in ein vorhandenes BigQuery-Dataset in Ihrem Google Cloud-Projekt.

Verwenden Sie den bq-Befehl zum Erstellen des wordcount_dataset:

bq mk wordcount_dataset

Verwenden Sie den gsutil-Befehl zum Erstellen eines Cloud Storage-Buckets, der für den Export nach BigQuery verwendet wird:

gsutil mb gs://[bucket]

Scala

  1. Prüfen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
    
    
  2. Code auf dem Cluster ausführen
    1. Mit SSH eine Verbindung zum Dataproc-Cluster-Masterknoten herstellen
      1. Rufen Sie in der Google Cloud Console die Seite Dataproc Cluster auf und klicken Sie auf den Namen Ihres Clusters
        Seite mit Dataproc-Clustern in der Cloud Console.
      2. Wählen Sie auf der Seite >Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann SSH rechts neben dem Namen des Cluster-Master-Knotens
        Detailseite des Dataproc-Clusters in der Cloud Console.

        Ein Browserfenster wird auf Ihrem Basisverzeichnis auf dem Master-Knoten geöffnet
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstelle wordcount.scala mit dem vorinstallierten Texteditor vi, vim oder nano und füge dann den Scala-Code aus der Scala-Codeliste ein.
      nano wordcount.scala
        
    3. Starten Sie die REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Führen Sie wordcount.scala mit dem :load wordcount.scala-Befehl aus, um die BigQuery-wordcount_output-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Wenn Sie die Vorschautabelle aufrufen möchten, öffnen Sie die Seite BigQuery, wählen Sie die Tabelle wordcount_output aus und klicken Sie auf Vorschau.
      Vorschautabelle in der BigQuery Explorer-Seite in der Cloud Console

PySpark

  1. Prüfen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. Führen Sie den Code auf Ihrem Cluster aus
    1. Mit SSH eine Verbindung zum Dataproc-Cluster-Masterknoten herstellen
      1. Rufen Sie in der Google Cloud Console die Seite Dataproc Cluster auf und klicken Sie auf den Namen Ihres Clusters
        Clusterseite in der Cloud Console.
      2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann SSH rechts neben dem Namen des Cluster-Master-Knotens
        Wählen Sie in der Cloud Console auf der Seite „Clusterdetails“ die Option „SSH on Clustername“ aus.

        Ein Browserfenster wird auf Ihrem Basisverzeichnis auf dem Master-Knoten geöffnet
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstelle wordcount.py mit dem vorinstallierten Texteditor vi, vim oder nano und füge dann den PySpark-Code aus der PySpark-Codeliste ein.
      nano wordcount.py
      
    3. Führen Sie Wordcount mit spark-submit aus, um die BigQuery-wordcount_output-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Wenn Sie die Vorschautabelle aufrufen möchten, öffnen Sie die Seite BigQuery, wählen Sie die Tabelle wordcount_output aus und klicken Sie auf Vorschau.
      Vorschautabelle in der BigQuery Explorer-Seite in der Cloud Console

Weitere Informationen