Utiliser le connecteur BigQuery avec Spark

Le spark-bigquery-connector est utilisé avec Apache Spark pour lire et écrire des données depuis et vers BigQuery. Ce tutoriel fournit un exemple de code utilisant le connecteur spark-bigquery dans une application Spark. Pour obtenir des instructions sur la création d'un cluster, consultez les guides de démarrage rapide de Dataproc.

Rendre le connecteur disponible pour votre application

Vous pouvez rendre le connecteur spark-bigquery à la disposition de votre application de l'une des manières suivantes:

  1. Installez le connecteur spark-bigquery dans le répertoire JAR Spark de chaque nœud à l'aide de l'action d'initialisation des connecteurs Dataproc lors de la création du cluster.

  2. Indiquez l'URI du connecteur lorsque vous envoyez la tâche:

    1. Console Google Cloud:utilisez l'élément de tâche Spark Jars files sur la page Dataproc Envoyer une tâche.
    2. gcloud CLI:utilisez l'option gcloud dataproc jobs submit spark --jars.
    3. API Dataproc:utilisez le champ SparkJob.jarFileUris.
  3. Incluez le fichier JAR dans votre application Scala ou Java Spark en tant que dépendance (consultez la section Compiler avec le connecteur).

Spécifier l'URI JAR du connecteur

Les versions du connecteur Spark-BigQuery sont répertoriées dans le dépôt GoogleCloudDataproc/spark-bigquery-connector GitHub.

Spécifiez le fichier JAR du connecteur en remplaçant les informations de version de Scala et du connecteur dans la chaîne d'URI suivante:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Utiliser Scala 2.12 avec les versions d'image Dataproc 1.5+

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    Exemple avec gcloud CLI:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Utilisez Scala 2.11 avec les versions d'image Dataproc 1.4 et antérieures:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    Exemple avec gcloud CLI:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

Calculer les coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Dataproc
  • BigQuery
  • Cloud Storage

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Lire et écrire des données depuis BigQuery

Cet exemple lit les données de BigQuery dans un DataFrame Spark pour effectuer un décompte de mots à l'aide de l'API de source de données standard.

Le connecteur écrit les données dans BigQuery en les mettant d'abord en mémoire tampon dans une table Cloud Storage temporaire. Il copie ensuite toutes les données depuis BigQuery en une seule opération. Le connecteur tente de supprimer les fichiers temporaires une fois l'opération de chargement BigQuery terminée, puis effectue une nouvelle tentative lorsque l'application Spark se termine. Si la tâche échoue, supprimez tous les fichiers Cloud Storage temporaires restants. En règle générale, les fichiers BigQuery temporaires se trouvent dans gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Configurer la facturation

Par défaut, le projet associé aux identifiants ou au compte de service est facturé pour l'utilisation de l'API. Pour facturer un autre projet, définissez la configuration suivante : spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Le projet à facturer peut également être défini au niveau des opérations de lecture/écriture, comme ceci : .option("parentProject", "<BILLED-GCP-PROJECT>").

Exécuter le code

Avant d'exécuter cet exemple, créez un ensemble de données nommé "WordCount_dataset" ou remplacez l'ensemble de données de sortie dans le code par un ensemble de données BigQuery existant dans votre projet Google Cloud.

Utilisez la commande bq pour créer l'élément wordcount_dataset :

bq mk wordcount_dataset

Utilisez la commande gsutil pour créer un bucket Cloud Storage qui servira à l'exportation vers BigQuery :

gsutil mb gs://[bucket]

Scala

  1. Examinez le code et remplacez l'espace réservé [bucket] par le bucket Cloud Storage que vous avez créé précédemment.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
    
    
  2. Exécutez le code sur votre cluster.
    1. Utiliser SSH pour vous connecter au nœud maître du cluster Dataproc
      1. Accédez à la page Clusters Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster
        Page des clusters Dataproc dans la console Cloud.
      2. Sur la page >Détails du cluster, sélectionnez l'onglet "Instances de VM". Ensuite, cliquez sur SSH à droite du nom du nœud maître du cluster
        Page "Détails du cluster Dataproc" de la console Cloud

        Une fenêtre de navigateur s'ouvre dans votre répertoire d'accueil sur le nœud maître
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Créez wordcount.scala avec l'éditeur de texte vi, vim ou nano préinstallé, puis collez le code Scala dans la liste de codes Scala.
      nano wordcount.scala
        
    3. Lancez la REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Exécutez "WordCount.scala" avec la commande :load wordcount.scala pour créer la table BigQuery wordcount_output. La liste de sortie affiche 10 lignes du résultat de la commande "wordcount".
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Pour prévisualiser la table de sortie, ouvrez la page BigQuery, sélectionnez la table wordcount_output, puis cliquez sur Aperçu.
      Prévisualiser la table sur la page de l'explorateur BigQuery dans la console Cloud.

PySpark

  1. Examinez le code et remplacez l'espace réservé [bucket] par le bucket Cloud Storage que vous avez créé précédemment.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. Exécutez le code sur votre cluster
    1. Utiliser SSH pour vous connecter au nœud maître du cluster Dataproc
      1. Accédez à la page Clusters Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster
        Page "Clusters" de la console Cloud.
      2. Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Ensuite, cliquez sur SSH à droite du nom du nœud maître du cluster
        Sélectionnez "SSH sur la ligne du nom du cluster" sur la page "Détails du cluster" de la console Cloud.

        Une fenêtre de navigateur s'ouvre dans votre répertoire d'accueil sur le nœud maître
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Créez wordcount.py avec l'éditeur de texte vi, vim ou nano préinstallé, puis collez le code PySpark à partir de la liste de codes PySpark
      nano wordcount.py
      
    3. Exécutez "wordcount" avec spark-submit pour créer la table BigQuery wordcount_output. La liste de sortie affiche 10 lignes du résultat de la commande "wordcount".
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Pour prévisualiser la table de sortie, ouvrez la page BigQuery, sélectionnez la table wordcount_output, puis cliquez sur Aperçu.
      Prévisualiser la table sur la page de l'explorateur BigQuery dans la console Cloud.

Pour en savoir plus