BigQuery-Connector mit Dataproc Serverless für Spark verwenden

Den spark-bigquery-connector verwenden mit Apache Spark um Daten aus und in BigQuery zu lesen und zu schreiben. In dieser Anleitung wird eine PySpark-Anwendung gezeigt, die die Methode spark-bigquery-connector

BigQuery-Connector mit Ihrer Arbeitslast verwenden

Siehe Dataproc Serverless für Spark-Laufzeit-Releases die Version des BigQuery-Connectors die Laufzeitversion der Batcharbeitslast. Ist der Connector nicht aufgeführt, finden Sie im nächsten Abschnitt eine Anleitung, wie Sie den Connector für alle Anwendungen.

Connector mit der Spark-Laufzeitversion 2.0 verwenden

Der BigQuery-Connector ist in der Spark-Laufzeitversion 2.0 nicht installiert. Bei Verwendung Spark-Laufzeitversion 2.0 können Sie den Connector für Ihre Anwendung verfügbar machen, auf eine der folgenden Arten:

  • Verwenden Sie den Parameter jars, um auf eine JAR-Datei des Connectors zu verweisen, wenn Sie senden Ihre Dataproc Serverless for Spark-Batcharbeitslast Im folgenden Beispiel wird eine Connector-JAR-Datei angegeben (siehe GoogleCloudDataproc/spark-bigquery-connector Repository auf GitHub, um eine Liste der verfügbaren Connector-JAR-Dateien zu erhalten.
    • Beispiel für die Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Fügen Sie die JAR-Datei des Connectors als Abhängigkeit in die Spark-Anwendung ein. (siehe Kompilierung über den Connector)

Kosten berechnen

In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloud verwendet, darunter:

  • Dataproc Serverless
  • BigQuery
  • Cloud Storage

Der Preisrechner kann eine Kostenschätzung anhand Ihrer voraussichtlichen Nutzung generieren. Neuen Cloud Platform-Nutzern steht gegebenenfalls eine kostenlose Testversion zur Verfügung.

BigQuery-E/A

In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.

Der Connector schreibt die Wordcount-Ausgabe in BigQuery so:

  1. Daten in temporären Dateien in Ihrem Cloud Storage-Bucket zwischenspeichern

  2. Kopieren Sie die Daten in einem Vorgang aus Ihrem Cloud Storage-Bucket in BigQuery

  3. Temporäre Dateien in Cloud Storage löschen, nachdem die BigQuery- abgeschlossen ist (temporäre Dateien werden ebenfalls gelöscht nach die Spark-Anwendung beendet wird). Wenn der Löschvorgang fehlschlägt, müssen Sie alle unerwünschten temporären Cloud Storage-Dateien, die in der Regel in „gs://your-bucket/.spark-bigquery-jobid-UUID“.

Abrechnung konfigurieren

Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Servicekonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Sie kann auch folgendermaßen einem Lese-/Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>").

PySpark-Wordcount-Batcharbeitslast senden

  1. wordcount_dataset erstellen mit dem bq-Befehlszeilentool oder in einem lokalen Terminal Cloud Shell:
    bq mk wordcount_dataset
    
  2. Cloud Storage-Bucket erstellen mit der Google Cloud CLI in einem lokalen oder in Cloud Shell
    gcloud storage buckets create gs://your-bucket
    
  3. Sehen Sie sich den Code an.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
    
  4. wordcount.py durch Kopieren lokal in einem Texteditor erstellen den PySpark-Code aus dem PySpark-Codeeintrag. Ersetzen Sie den Parameter [your-bucket] mit dem Namen der Cloud Storage-Bucket, den Sie erstellt haben.
  5. Senden Sie die PySpark-Batcharbeitslast:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    Beispielausgabe im Terminal:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Wenn Sie eine Vorschau der Ausgabetabelle in der Google Cloud Console ansehen möchten, öffnen Sie das BigQuery die Tabelle wordcount_output aus und klicken Sie auf Vorschau:

Weitere Informationen