spark-bigquery-connector と Apache Spark を併用すると、BigQuery との間でデータの読み書きを行えます。このチュートリアルでは、spark-bigquery-connector
を使用する PySpark アプリケーションについて説明します。
ワークロードで BigQuery コネクタを使用する
バッチ ワークロードのランタイム バージョンにインストールされている BigQuery コネクタのバージョンを確認するには、Spark 向け Dataproc Serverless ランタイム リリースをご覧ください。コネクタがリストにない場合は、次のセクションでコネクタをアプリケーションで使用可能にする方法をご覧ください。
Spark ランタイム バージョン 2.0 でコネクタを使用する方法
Spark ランタイム バージョン 2.0 では、BigQuery コネクタはインストールされません。Spark ランタイム バージョン 2.0 を使用する場合は、次のいずれかの方法でコネクタをアプリケーションで使用できます。
- Spark 用 Dataproc サーバーレスのバッチ ワークロードを送信するときにコネクタ jar ファイルを指定するには
jars
パラメータを使用します。次の例では、コネクタの jar ファイルを指定しています(使用可能なコネクタ jar ファイルの一覧については、GitHub の GoogleCloudDataproc/spark-bigquery-connector リポジトリをご覧ください)。- Google Cloud CLI の例:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Google Cloud CLI の例:
- Spark アプリケーションにコネクタ jar ファイルを依存関係として含めます(コネクタのコンパイルをご覧ください)。
費用の計算
このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。
- Dataproc Serverless
- BigQuery
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。Cloud Platform の新規ユーザーは無料トライアルをご利用いただけます。
BigQuery I/O
このサンプルでは、BigQuery から Spark DataFrame にデータを読み込み、標準データソース API を使用してワード数をカウントします。
コネクタは、次の方法でワードカウント出力を BigQuery に書き込みます。
Cloud Storage バケット内の一時ファイルにデータをバッファリングします
1 回のオペレーションで Cloud Storage バケットから BigQuery にデータをコピーします
BigQuery の読み込みオペレーションが完了すると、Cloud Storage 内の一時ファイルが削除されます(一時ファイルは Spark アプリケーションが終了した後にも削除されます)。削除に失敗した場合は、不要な Cloud Storage 一時ファイルを削除する必要があります。ファイルは通常、
gs://your-bucket/.spark-bigquery-jobid-UUID
にあります。
課金の構成
デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに請求するには、次のように構成します: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
また、読み取り/書き込みオペレーションに次のように追加することもできます: .option("parentProject", "<BILLED-GCP-PROJECT>")
。
PySpark wordcount バッチ ワークロードを送信する
- ローカル ターミナルまたは Cloud Shell で bq コマンドライン ツールを使用して
wordcount_dataset
を作成します。bq mk wordcount_dataset
- ローカル ターミナルまたは Cloud Shell で、Google Cloud CLI を使用して Cloud Storage バケットを作成します。
gcloud storage buckets create gs://your-bucket
- コードを調べる。
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- PySpark のコード一覧から PySpark コードをコピーして、テキスト エディタで
wordcount.py
をローカルに作成します。[your-bucket] プレースホルダは作成した Cloud Storage バケットの名前で置き換えます。 - PySpark バッチ ワークロードを送信します。
ターミナル出力例:gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Google Cloud コンソールで出力テーブルをプレビューするには、プロジェクトの [BigQuery] ページを開き、wordcount_output
テーブルを選択して、[プレビュー] をクリックします。