BigQuery-Connector mit Dataproc Serverless for Spark verwenden

Verwenden Sie den spark-bigquery-connector mit Apache Spark, um Daten in BigQuery zu lesen und zu schreiben. In dieser Anleitung wird eine PySpark-Anwendung mit der spark-bigquery-connector veranschaulicht.

BigQuery-Connector mit Ihrer Arbeitslast verwenden

Unter Dataproc Serverless for Spark-Laufzeitversionen finden Sie die BigQuery-Connector-Version, die in Ihrer Batch-Arbeitslast-Laufzeitversion installiert ist. Wenn der Anschluss nicht aufgeführt ist, finden Sie im nächsten Abschnitt eine Anleitung dazu, wie Sie ihn für Anwendungen verfügbar machen.

Connector mit Spark-Laufzeitversion 2.0 verwenden

Der BigQuery-Connector ist in der Spark-Laufzeitversion 2.0 nicht installiert. Wenn Sie die Spark-Laufzeitversion 2.0 verwenden, haben Sie folgende Möglichkeiten, den Connector für Ihre Anwendung verfügbar zu machen:

  • Verwenden Sie den Parameter jars, um auf eine Connector-JAR-Datei zu verweisen, wenn Sie Ihre Dataproc Serverless-Batch-Arbeitslast für Spark einreichen. Im folgenden Beispiel wird eine Connector-JAR-Datei angegeben. Eine Liste der verfügbaren Connector-JAR-Dateien finden Sie im GitHub-Repository GoogleCloudDataproc/spark-bigquery-connector.
    • Beispiel für die Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Schließen Sie die JAR-Datei des Connectors als Abhängigkeit in Ihre Spark-Anwendung ein (siehe Für den Connector kompilieren).

Kosten berechnen

In dieser Anleitung werden kostenpflichtige Komponenten von Google Cloud verwendet, darunter:

  • Dataproc Serverless
  • BigQuery
  • Cloud Storage

Der Preisrechner kann eine Kostenschätzung anhand Ihrer voraussichtlichen Nutzung generieren. Neuen Cloud Platform-Nutzern steht gegebenenfalls eine kostenlose Testversion zur Verfügung.

BigQuery-E/A

In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.

Der Connector schreibt die Wortanzahl-Ausgabe so in BigQuery:

  1. Daten in temporären Dateien in Ihrem Cloud Storage-Bucket puffern

  2. Daten in einem Vorgang aus Ihrem Cloud Storage-Bucket in BigQuery kopieren

  3. Die temporären Dateien in Cloud Storage werden nach Abschluss des BigQuery-Ladevorgangs gelöscht. Sie werden auch nach dem Beenden der Spark-Anwendung gelöscht. Wenn das Löschen fehlschlägt, müssen Sie alle unerwünschten temporären Cloud Storage-Dateien löschen, die sich normalerweise in gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID befinden.

Abrechnung konfigurieren

Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Servicekonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Sie kann auch folgendermaßen einem Lese- oder Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>").

PySpark-WordCount-Batcharbeitslast einreichen

Eine Spark-Batcharbeitslast ausführen, die die Anzahl der Wörter in einem öffentlichen Datensatz zählt

  1. Öffnen Sie ein lokales Terminal oder Cloud Shell.
  2. Erstellen Sie die wordcount_dataset mit dem bq-Befehlszeilentool in einem lokalen Terminal oder in Cloud Shell.
    bq mk wordcount_dataset
    
  3. Erstellen Sie einen Cloud Storage-Bucket mit der Google Cloud CLI.
    gcloud storage buckets create gs://YOUR_BUCKET
    
    Ersetzen Sie YOUR_BUCKET durch den Namen des von Ihnen erstellten Cloud Storage-Bucket.
  4. Erstellen Sie die Datei wordcount.py lokal in einem Texteditor, indem Sie den folgenden PySpark-Code kopieren.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "YOUR_BUCKET"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  5. Reichen Sie die PySpark-Batcharbeitslast ein:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=YOUR_BUCKET
    
    Beispiel für eine Terminalausgabe:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Wenn Sie eine Vorschau der Ausgabetabelle in der Google Cloud Console aufrufen möchten, öffnen Sie die Seite BigQuery Ihres Projekts, wählen Sie die Tabelle wordcount_output aus und klicken Sie dann auf Vorschau.

Weitere Informationen