Utilizzare il connettore BigQuery con Spark

Il spark-bigquery-connector viene utilizzato con Apache Spark per leggere e scrivere dati da e in BigQuery. Questo tutorial fornisce un codice di esempio che utilizza il connettore spark-bigquery-all'interno di un'applicazione Spark. Per istruzioni sulla creazione di un cluster, consulta le guide rapide di Dataproc.

Rendi il connettore disponibile per la tua applicazione

Puoi rendere disponibile il connettore spark-bigquery-per l'applicazione in uno dei seguenti modi:

  1. Installa spark-bigquery-connector nella directory Spark jars di ogni nodo utilizzando l'azione di inizializzazione dei connettori Dataproc quando crei il cluster.

  2. Fornisci l'URI del connettore quando invii il job:

    1. Console Google Cloud: utilizza l'elemento del job Spark Jars files nella pagina Invia un job di Dataproc.
    2. gcloud CLI: usa il flag gcloud dataproc jobs submit spark --jars.
    3. API Dataproc: utilizza il campo SparkJob.jarFileUris.
  3. Includi il jar nell'applicazione Scala o Java Spark come dipendenza (consulta la sezione Compilazione rispetto al connettore).

Come specificare l'URI jar del connettore

Le versioni del connettore Spark-BigQuery sono elencate nel repository GoogleCloudDataproc/spark-bigquery-connector.

Specifica il jar del connettore sostituendo le informazioni sulla versione di Scala e del connettore nella seguente stringa URI:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Utilizzo di Scala 2.12 con le versioni immagine di Dataproc 1.5+

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    Esempio della gcloud CLI:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Utilizza Scala 2.11 con immagini Dataproc versioni 1.4 e precedenti:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    Esempio della gcloud CLI:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

Calcolo dei costi

In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi. I nuovi utenti di Google Cloud potrebbero essere idonei per una prova gratuita.

Lettura e scrittura di dati da BigQuery

Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire un conteggio di parole utilizzando l'API standard per l'origine dati.

Il connettore scrive i dati in BigQuery eseguendo prima il buffering di tutti i dati in una tabella temporanea Cloud Storage. Poi, copia tutti i dati in BigQuery in un'unica operazione. Il connettore tenta di eliminare i file temporanei una volta che l'operazione di caricamento di BigQuery è riuscita e di nuovo al termine dell'applicazione Spark. Se il job non riesce, rimuovi tutti i file Cloud Storage temporanei. In genere, i file BigQuery temporanei si trovano in gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Configurazione della fatturazione

Per impostazione predefinita, al progetto associato alle credenziali o all'account di servizio viene addebitato l'utilizzo dell'API. Per fatturare un progetto diverso, imposta la seguente configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Può anche essere aggiunto a un'operazione di lettura/scrittura, come segue: .option("parentProject", "<BILLED-GCP-PROJECT>").

Esecuzione del codice

Prima di eseguire questo esempio, crea un set di dati denominato "wordcount_dataset" o modifica il set di dati di output nel codice inserendo un set di dati BigQuery esistente nel tuo progetto Google Cloud.

Utilizza il comando bq per creare la wordcount_dataset:

bq mk wordcount_dataset

Utilizza il comando gsutil per creare un bucket Cloud Storage, che verrà utilizzato per l'esportazione in BigQuery:

gsutil mb gs://[bucket]

Scala

  1. Esamina il codice e sostituisci il segnaposto [bucket] con il bucket Cloud Storage che hai creato in precedenza.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
    
    
  2. Esegui il codice nel tuo cluster.
    1. Utilizza SSH per connetterti al nodo master del cluster Dataproc
      1. Vai alla pagina Cluster di Dataproc nella console Google Cloud, quindi fai clic sul nome del tuo cluster
        Cluster Dataproc nella console Cloud.
      2. Nella pagina >Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster
        Pagina dei dettagli del cluster Dataproc nella console Cloud.

        Si apre una finestra del browser nella home directory sul nodo master.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea wordcount.scala con l'editor di testo vi, vim o nano preinstallato, poi incolla il codice Scala dall'elenco di codici di Scala
      nano wordcount.scala
        
    3. Avvia il REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Esegui wordcount.scala con il comando :load wordcount.scala per creare la tabella BigQuery wordcount_output. L'elenco di output mostra 20 righe dell'output del conteggio parole.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Per visualizzare l'anteprima della tabella di output, apri la pagina BigQuery, seleziona la tabella wordcount_output, quindi fai clic su Anteprima.
      Visualizza l'anteprima della tabella nella pagina Explorer di BigQuery nella console Cloud.

PySpark

  1. Esamina il codice e sostituisci il segnaposto [bucket] con il bucket Cloud Storage che hai creato in precedenza.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. Esegui il codice sul tuo cluster
    1. Utilizza SSH per connetterti al nodo master del cluster Dataproc
      1. Vai alla pagina Cluster di Dataproc nella console Google Cloud, quindi fai clic sul nome del tuo cluster
        pagina Cluster nella console Cloud.
      2. Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su SSH a destra del nome del nodo master del cluster
        Seleziona SSH nella riga del nome del cluster nella pagina Dettagli cluster nella console Cloud.

        Si apre una finestra del browser nella home directory sul nodo master.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea wordcount.py con l'editor di testo vi, vim o nano preinstallato, poi incolla il codice PySpark dall'elenco dei codici PySpark
      nano wordcount.py
      
    3. Esegui conteggio parole con spark-submit per creare la tabella BigQuery wordcount_output. L'elenco di output mostra 20 righe dell'output del conteggio parole.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Per visualizzare l'anteprima della tabella di output, apri la pagina BigQuery, seleziona la tabella wordcount_output, quindi fai clic su Anteprima.
      Visualizza l'anteprima della tabella nella pagina Explorer di BigQuery nella console Cloud.

Per maggiori informazioni