BigQuery-Connector mit Dataproc Serverless für Spark verwenden

Verwenden Sie den spark-bigquery-connector mit Apache Spark, um Daten in BigQuery zu lesen und zu schreiben. In dieser Anleitung wird eine PySpark-Anwendung gezeigt, die spark-bigquery-connector verwendet.

BigQuery-Connector mit Ihrer Arbeitslast verwenden

Unter Dataproc Serverless for Spark-Laufzeitversionen finden Sie die Version des BigQuery-Connectors, die in der Laufzeitversion Ihrer Batcharbeitslast installiert ist. Wenn der Connector nicht aufgeführt ist, finden Sie im nächsten Abschnitt eine Anleitung dazu, wie Sie ihn für Anwendungen verfügbar machen.

Connector mit Spark-Laufzeitversion 2.0 verwenden

Der BigQuery-Connector ist in der Spark-Laufzeitversion 2.0 nicht installiert. Wenn Sie die Spark-Laufzeitversion 2.0 verwenden, können Sie den Connector auf eine der folgenden Arten für Ihre Anwendung verfügbar machen:

  • Verwenden Sie den Parameter jars, um auf eine Connector-JAR-Datei zu verweisen, wenn Sie die Batcharbeitslast von Dataproc Serverless for Spark senden. Im folgenden Beispiel wird eine Connector-JAR-Datei angegeben. Eine Liste der verfügbaren Connector-JAR-Dateien finden Sie im Repository GoogleCloudDataproc/spark-bigquery-connector auf GitHub.
    • Beispiel für die Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Fügen Sie die Connector-JAR-Datei als Abhängigkeit in Ihre Spark-Anwendung ein (siehe Mit dem Connector kompilieren).

Kosten berechnen

In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloud verwendet, darunter:

  • Dataproc Serverless
  • BigQuery
  • Cloud Storage

Der Preisrechner kann eine Kostenschätzung anhand Ihrer voraussichtlichen Nutzung generieren. Neuen Cloud Platform-Nutzern steht gegebenenfalls eine kostenlose Testversion zur Verfügung.

BigQuery-E/A

In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.

Der Connector schreibt die Wordcount-Ausgabe in BigQuery:

  1. Daten in temporären Dateien in Ihrem Cloud Storage-Bucket zwischenspeichern

  2. Daten in einem Vorgang aus Ihrem Cloud Storage-Bucket nach BigQuery kopieren

  3. Löschen der temporären Dateien in Cloud Storage nach Abschluss des BigQuery-Ladevorgangs (temporäre Dateien werden auch gelöscht, nachdem die Spark-Anwendung beendet wurde). Wenn das Löschen fehlschlägt, müssen Sie alle unerwünschten temporären Cloud Storage-Dateien löschen. Diese befinden sich normalerweise in gs://your-bucket/.spark-bigquery-jobid-UUID.

Abrechnung konfigurieren

Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Servicekonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Sie kann auch folgendermaßen einem Lese-/Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>").

PySpark-Wordcount-Batcharbeitslast senden

  1. Erstellen Sie den wordcount_dataset mit dem bq-Befehlszeilentool in einem lokalen Terminal oder in Cloud Shell.
    bq mk wordcount_dataset
    
  2. Erstellen Sie mit dem gsutil-Befehlszeilentool einen Cloud Storage-Bucket in einem lokalen Terminal oder in Cloud Shell.
    gsutil mb gs://your-bucket
    
  3. Sehen Sie sich den Code an.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
    
  4. Erstellen Sie wordcount.py lokal in einem Texteditor. Kopieren Sie dazu den PySpark-Code aus der PySpark-Codeliste und ersetzen Sie den Platzhalter [your-bucket] durch den Namen des von Ihnen erstellten Cloud Storage-Bucket.
  5. Senden Sie die PySpark-Batcharbeitslast:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    Beispiel für eine Terminalausgabe:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Wenn Sie sich eine Vorschau der Ausgabetabelle in der Google Cloud Console ansehen möchten, öffnen Sie die Seite BigQuery Ihres Projekts, wählen Sie die Tabelle wordcount_output aus und klicken Sie auf Vorschau.

Weitere Informationen