BigQuery コネクタを Dataproc Serverless for Spark とともに使用する

Apache Sparkspark-bigquery-connector を使用して、BigQuery との間でデータの読み書きを行います。このチュートリアルでは、spark-bigquery-connector を使用する PySpark アプリケーションについて説明します。

ワークロードへのコネクタを提供する

次のいずれかの方法で、アプリケーションでコネクタを使用できます。

  • Spark 用 Dataproc サーバーレスのバッチ ワークロードを送信するときにコネクタ jar ファイルを指定するには jars パラメータを使用します。次の例では、コネクタの jar ファイルを指定しています(使用可能なコネクタ jar ファイルの一覧については、GitHub の GoogleCloudDataproc/spark-bigquery-connector リポジトリをご覧ください)。
    • SDK の例:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-version.jar \
          ... other args
      
  • Spark アプリケーションにコネクタ jar ファイルを依存関係として含めます(コネクタのコンパイルをご覧ください)。

費用の計算

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

  • Dataproc Serverless
  • BigQuery
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。Cloud Platform の新規ユーザーは無料トライアルをご利用いただけます。

BigQuery I/O

このサンプルでは、BigQuery から Spark DataFrame にデータを読み込み、標準データソース API を使用してワード数をカウントします。

コネクタは、次の方法でワードカウントの出力を BigQuery に書き込みます。

  1. Cloud Storage バケット内の一時ファイルにデータをバッファリングします

  2. 1 回のオペレーションで Cloud Storage バケットから BigQuery にデータをコピーします

  3. BigQuery の読み込みオペレーションが完了すると、Cloud Storage 内の一時ファイルが削除されます(一時ファイルは Spark アプリケーションが終了した後にも削除されます)。削除に失敗した場合は、不要な Cloud Storage 一時ファイルを削除する必要があります。ファイルは通常、gs://your-bucket/.spark-bigquery-jobid-UUID にあります。

課金の構成

デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに請求するには、次のように構成します: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

また、読み取り/書き込みオペレーションに次のように追加することもできます: .option("parentProject", "<BILLED-GCP-PROJECT>")

PySpark のワードカウント バッチ ワークロードを送信する

  1. ローカル ターミナルまたは Cloud Shellbq コマンドライン ツールを使用して wordcount_dataset を作成します。
    bq mk wordcount_dataset
    
  2. ローカル ターミナルまたは Cloud Shell で、gsutil コマンドライン ツールを使用して Cloud Storage バケットを作成します。
    gsutil mb gs://your-bucket
    
  3. コードを調べる。
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
    
  4. PySpark のコード一覧から PySpark コードをコピーして、テキスト エディタで wordcount.py をローカルに作成します。[your-bucket] プレースホルダは作成した Cloud Storage バケットの名前で置き換えます。
  5. PySpark バッチ ワークロードを送信します。
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    ターミナル出力の例:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    コンソールで出力テーブルをプレビューするには、プロジェクトの BigQuery ページで wordcount_output テーブルを選択し、[プレビュー] をクリックします。

詳細情報