Utiliser le connecteur BigQuery avec Dataproc sans serveur pour Spark

Utilisez spark-bigquery-connector avec Apache Spark pour lire et écrire des données depuis et dans BigQuery. Ce tutoriel présente une application PySpark qui utilise le spark-bigquery-connector.

Utiliser le connecteur BigQuery avec votre charge de travail

Consultez la section Versions de l'environnement d'exécution Dataproc sans serveur pour Spark pour déterminer la version du connecteur BigQuery installée dans la version d'exécution de votre charge de travail par lot. Si le connecteur ne figure pas dans la liste, consultez la section suivante pour savoir comment le mettre à la disposition des applications.

Utiliser le connecteur avec la version d'exécution 2.0 Spark

Le connecteur BigQuery n'est pas installé dans la version d'exécution 2.0 de Spark. Lorsque vous utilisez la version d'exécution 2.0 de Spark, vous pouvez mettre le connecteur à la disposition de votre application de l'une des manières suivantes:

  • Utilisez le paramètre jars pour pointer vers un fichier JAR de connecteur lorsque vous envoyez votre charge de travail par lot Dataproc sans serveur pour Spark. L'exemple suivant spécifie un fichier JAR de connecteur (consultez le dépôt GoogleCloudDataproc/spark-bigquery-connector sur GitHub pour obtenir la liste des fichiers JAR disponibles).
    • Exemple de Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Incluez le fichier JAR du connecteur dans votre application Spark en tant que dépendance (consultez Compilation avec le connecteur).

Calculer les coûts

Ce tutoriel utilise des composants facturables de Google Cloud, dont :

  • Dataproc sans serveur
  • BigQuery
  • Cloud Storage

Utilisez le Simulateur de coût pour générer une estimation des coûts en fonction de votre utilisation prévue. Les nouveaux utilisateurs de Cloud Platform peuvent bénéficier d'un essai gratuit.

E/S BigQuery

Cet exemple lit les données de BigQuery dans un DataFrame Spark pour effectuer un décompte de mots à l'aide de l'API de source de données standard.

Le connecteur écrit la sortie du nombre de mots dans BigQuery en:

  1. Mettre les données en mémoire tampon dans des fichiers temporaires dans votre bucket Cloud Storage

  2. Copier les données en une seule opération de votre bucket Cloud Storage vers BigQuery

  3. Suppression des fichiers temporaires dans Cloud Storage une fois l'opération de chargement BigQuery terminée (les fichiers temporaires sont également supprimés après l'arrêt de l'application Spark). Si la suppression échoue, vous devez supprimer tous les fichiers Cloud Storage temporaires indésirables, généralement placés dans gs://your-bucket/.spark-bigquery-jobid-UUID.

Configurer la facturation

Par défaut. le projet qui est facturé pour l'utilisation de l'API est le projet associé aux identifiants ou au compte de service. Pour facturer un autre projet, définissez la configuration suivante : spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Le projet à facturer peut également être défini au niveau des opérations de lecture/écriture, comme ceci : .option("parentProject", "<BILLED-GCP-PROJECT>").

Envoyer une charge de travail par lot de décompte de mots PySpark

  1. Créez le fichier wordcount_dataset à l'aide de l'outil de ligne de commande bq dans un terminal local ou dans Cloud Shell.
    bq mk wordcount_dataset
    
  2. Créez un bucket Cloud Storage à l'aide de l'outil de ligne de commande gsutil" dans un terminal local ou dans Cloud Shell.
    gsutil mb gs://your-bucket
    
  3. Examinez le code.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
    
  4. Créez wordcount.py localement dans un éditeur de texte en copiant le code PySpark à partir de la liste de codes PySpark, puis remplacez l'espace réservé [your-bucket] par le nom du bucket Cloud Storage que vous avez créé.
  5. Envoyez la charge de travail par lot PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    Exemple de sortie de terminal:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Pour prévisualiser la table de sortie dans la console Google Cloud, ouvrez la page BigQuery de votre projet, sélectionnez la table wordcount_output, puis cliquez sur Aperçu.

Pour en savoir plus