このページでは、Spark Spanner コネクタを使用して、Apache Spark で Spanner からデータを読み取る方法について説明します。
費用の計算
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
- Dataproc
- Spanner
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
チュートリアルを実行する前に、コネクタのバージョンを確認し、コネクタ URI を取得します。
コネクタ JAR ファイルの URI を指定する方法
Spark Spanner コネクタのバージョンは、GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリに記載されています。
コネクタ JAR ファイルは、次の URI 文字列のコネクタ バージョン情報を置き換えて指定します。
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
このコネクタは Spark バージョン 3.1+
で使用できます。
gcloud CLI の例:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Spanner データベースを準備する
Spanner テーブルがない場合は、チュートリアルに沿って Spanner テーブルを作成できます。その後、インスタンス ID、データベース ID、テーブル Singers
が作成されます。
Dataproc クラスタの作成
コネクタを使用する Dataproc クラスタには、spanner
または cloud-platform
のスコープが必要です。Dataproc クラスタのデフォルトのスコープは、イメージ 2.1 以降では cloud-platform
です。以前のバージョンを使用している場合は、Google Cloud コンソール、Google Cloud CLI、Dataproc API を使用して Dataproc クラスタを作成できます。
Console
- Google Cloud コンソールで、Dataproc の [クラスタの作成] ページを開きます。
- [セキュリティの管理] タブで、[プロジェクト アクセス] セクションの [このクラスタのクラウド プラットフォーム スコープを有効にします] をクリックします。
- クラスタ作成の他のフィールドへの入力および確認を行い、[作成] をクリックします。
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
GceClusterConfig.serviceAccountScopes は、clusters.create リクエストの一部として指定できます。次に例を示します。"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
対応する Spanner 権限が Dataproc VM サービス アカウントに割り当てられていることを確認する必要があります。このチュートリアルで Data Boost を使用する場合は、Data Boost IAM 権限をご覧ください。
Spanner からデータを読み取る
Scala と Python を使用すると、Spark データソース API を使用して Spanner から Spark Dataframe までのデータを読み取ることができます。
Scala
- コードを確認し、[projectId]、[instanceId]、[databaseId]、[table] のプレースホルダを、前に作成したプロジェクト ID、インスタンス ID、データベース ID、テーブルに置き換えます。enableDataBoost オプションを使用すると、Spanner の Data Boost 機能が有効になります。これはメインの Spanner インスタンスへの影響がほぼゼロです。
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- クラスタ上でコードを実行する
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタのマスターノード名の右側にある
SSH
をクリックします。
マスターノードのホーム ディレクトリでブラウザ ウィンドウが開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 事前にインストールされた
vi
、vim
、nano
テキスト エディタでsingers.scala
を作成し、Scala コードの一覧から Scala コードを貼り付けます。nano singers.scala
spark-shell
REPL を起動します。$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
:load singers.scala
コマンドで singers.scala を実行して、Spanner のSingers
テーブルを作成します。出力リストには、Singers 出力の例が表示されます。> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- コードを確認し、[projectId]、[instanceId]、[databaseId]、[table] のプレースホルダを、前に作成したプロジェクト ID、インスタンス ID、データベース ID、テーブルに置き換えます。enableDataBoost オプションを使用すると、Spanner の Data Boost 機能が有効になります。これはメインの Spanner インスタンスへの影響がほぼゼロです。
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- クラスタ上でコードを実行する
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスター ノード名の右側にある
SSH
をクリックします。
プライマリ ノードのホーム ディレクトリでブラウザ ウィンドウが開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 事前にインストールされた
vi
、vim
、またはnano
テキスト エディタでsingers.py
を作成し、PySpark のコード一覧から PySpark コードを貼り付けます。nano singers.py
spark-submit
で singers.py を実行し、Spanner のSingers
テーブルを作成します。 次のように出力されます。spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
クリーンアップ
このチュートリアルで作成したリソースについて、Google Cloud アカウントへの継続的な課金を回避する手順は、次のとおりです。
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME