Utilizzare il connettore BigQuery con Dataproc serverless per Spark

Utilizza spark-bigquery-connector con Apache Spark per leggere e scrivere dati da e in BigQuery. Questo tutorial mostra un'applicazione PySpark che utilizza spark-bigquery-connector.

Utilizza il connettore BigQuery con il tuo carico di lavoro

Consulta le release del runtime Dataproc Serverless per Spark per determinare la versione del connettore BigQuery installata nella versione del runtime del tuo carico di lavoro batch. Se il connettore non è presente nell'elenco, consulta la sezione successiva per istruzioni su come renderlo disponibile per le applicazioni.

Come utilizzare il connettore con il runtime Spark versione 2.0

Il connettore BigQuery non è installato nella versione 2.0 del runtime Spark. Quando si utilizza Versione 2.0 del runtime Spark, puoi rendere il connettore disponibile per la tua applicazione in uno dei seguenti modi:

  • Utilizza il parametro jars per fare riferimento a un file jar del connettore quando invii il tuo carico di lavoro batch di Dataproc Serverless per Spark. L'esempio seguente specifica un file jar del connettore (consulta il repository GoogleCloudDataproc/spark-bigquery-connector su GitHub per un elenco dei file jar del connettore disponibili).
    • Esempio di Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Includi il file jar del connettore nell'applicazione Spark come dipendenza (vedi Compilazione rispetto al connettore)

Calcola i costi

Questo tutorial utilizza componenti fatturabili di Google Cloud, tra cui:

  • Dataproc Serverless
  • BigQuery
  • Cloud Storage

Utilizza il Calcolatore prezzi per generare una stima dei costi in base all'utilizzo previsto. I nuovi utenti della piattaforma Cloud potrebbero hai diritto a una prova gratuita.

BigQuery I/O

Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire un conteggio parole utilizzando l'origine dati standard l'API.

Il connettore scrive l'output del conteggio parole in BigQuery come segue:

  1. Mettere in buffer i dati in file temporanei nel bucket Cloud Storage

  2. Copiare i dati in un'operazione dal bucket Cloud Storage in BigQuery

  3. Eliminazione dei file temporanei in Cloud Storage al termine dell'operazione di caricamento di BigQuery (i file temporanei vengono eliminati anche al termine dell'applicazione Spark). Se l'eliminazione non va a buon fine, dovrai eliminare eventuali file Cloud Storage temporanei indesiderati, che in genere vengono posizionati in gs://your-bucket/.spark-bigquery-jobid-UUID.

Configura la fatturazione

Per impostazione predefinita, al progetto associato alle credenziali o all'account di servizio viene addebitato l'utilizzo dell'API. Per fatturare un progetto diverso, imposta quanto segue configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Può anche essere aggiunto a un'operazione di lettura/scrittura, come segue: .option("parentProject", "<BILLED-GCP-PROJECT>").

Invia un carico di lavoro batch per il conteggio di parole PySpark

  1. Crea wordcount_dataset con lo strumento a riga di comando bq in un terminale locale o in Cloud Shell.
    bq mk wordcount_dataset
    
  2. Crea un bucket Cloud Storage con Google Cloud CLI in un terminale locale o in Cloud Shell.
    gcloud storage buckets create gs://your-bucket
    
  3. Esamina il codice.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  4. Crea wordcount.py localmente in un editor di testo copiando il codice PySpark dall'elenco del codice PySpark. Sostituisci il segnaposto [your-bucket] con il nome del bucket Cloud Storage che hai creato.
  5. Invia il carico di lavoro batch PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    Output del terminale di esempio:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Per visualizzare l'anteprima della tabella di output nella console Google Cloud, apri BigQuery pagina, seleziona la tabella wordcount_output e fai clic su Anteprima.

Per ulteriori informazioni