このページでは、Spark Spanner コネクタを使用して、Apache Spark で Spanner からデータを読み取る Dataproc クラスタを作成する方法について説明します。
Spanner コネクタは Spark と連携して、Spanner Java ライブラリを使用して Spanner データベースからデータを読み取ります。Spanner コネクタは、Spanner のテーブルとグラフを Spark の DataFrame と GraphFrame に読み込むことをサポートしています。
費用
このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。
- Dataproc
- Spanner
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
このチュートリアルで Spanner コネクタを使用する前に、Dataproc クラスタと Spanner インスタンスとデータベースを設定します。
Dataproc クラスタを設定する
Dataproc クラスタを作成するか、次の設定を持つ既存の Dataproc クラスタを使用します。
VM サービス アカウントの権限。クラスタの VM サービス アカウントには、適切な Spanner 権限を割り当てる必要があります。Data Boost を使用する場合(Spanner テーブルをエクスポートするのサンプルコードでは Data Boost が有効になっています)、VM サービス アカウントに必要な Data Boost IAM 権限も付与されている必要があります。
アクセス スコープ。クラスタは、
cloud-platform
スコープまたは適切なspanner
スコープを有効にして作成する必要があります。イメージ バージョン 2.1 以降で作成されたクラスタでは、cloud-platform
スコープがデフォルトで有効になっています。次の手順では、 Google Cloud コンソール、gcloud CLI、または Dataproc API を使用するクラスタ作成リクエストの一部として
cloud-platform
スコープを設定する方法について説明します。クラスタの作成手順については、クラスタを作成するをご覧ください。Google Cloud コンソール
- Google Cloud コンソールで、Dataproc の [クラスタの作成] ページを開きます。
- [セキュリティ管理] パネルの [プロジェクト アクセス] セクションで、[このクラスタのクラウド プラットフォーム スコープを有効にする] をクリックします。
- クラスタ作成の他のフィールドに入力するか、確認して、[作成] をクリックします。
gcloud CLI
次の
gcloud dataproc clusters create
コマンドを実行して、cloud-platform
スコープが有効なクラスタを作成できます。gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
clusters.create リクエストの一部として GceClusterConfig.serviceAccountScopes を指定できます。
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Singers データベース テーブルを使用して Spanner インスタンスを設定する
Singers
テーブルを含むデータベースを使用して Spanner インスタンスを作成します。Spanner インスタンス ID とデータベース ID をメモします。
Spark で Spanner コネクタを使用する
Spanner コネクタは、Spark バージョン 3.1+
で使用できます。ジョブを送信するときに、Cloud Storage コネクタ JAR ファイル仕様の一部としてコネクタ バージョンを指定します。
例: Spanner コネクタを使用した gcloud CLI Spark ジョブの送信。
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
次のように置き換えます。
CONNECTOR_VERSION: Spanner コネクタのバージョン。GitHub の GoogleCloudDataproc/spark-spanner-connector
リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。
Spanner テーブルを読み取る
Python または Scala を使用して、Spark データソース API を使用して Spanner テーブルデータを Spark Dataframe に読み取ることが可能です。
PySpark
このセクションの PySpark コードの例をクラスタで実行するには、Dataproc サービスにジョブを送信するか、クラスタ マスターノードの spark-submit
REPL からジョブを実行します。
Dataproc ジョブ
- ローカル テキスト エディタを使用するか、Cloud Shell でプリインストールされている
vi
、vim
、またはnano
テキスト エディタを使用してsingers.py
ファイルを作成します。 - プレースホルダ変数に値を入力したら、次のコードを
singers.py
ファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
次のように置き換えます。
- PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- INSTANCE_ID、DATABASE_ID、TABLE_NAME:
Singers
データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
singers.py
ファイルを保存します。- Google Cloud コンソール、gcloud CLI、または Dataproc API を使用して、Dataproc サービスにジョブを送信します。
例: Spanner コネクタを使用した gcloud CLI ジョブの送信。
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
次のように置き換えます。
- CLUSTER_NAME: 新しいクラスタの名前。
- REGION: ワークロードを実行できる利用可能な Compute Engine リージョン。
- CONNECTOR_VERSION: Spanner コネクタのバージョン。
GitHub の
GoogleCloudDataproc/spark-spanner-connector
リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。
spark-submit ジョブ
- SSH を使用して Dataproc クラスタ マスターノードに接続します。
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある
SSH
をクリックします。マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 事前にインストールされている
vi
、vim
、nano
テキスト エディタを使用して、マスターノードにsingers.py
ファイルを作成します。- 次のコードを
singers.py
ファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
次のように置き換えます。
- PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- INSTANCE_ID、DATABASE_ID、TABLE_NAME:
Singers
データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
singers.py
ファイルを保存します。
- 次のコードを
spark-submit
でsingers.py
を実行して、Spanner のSingers
テーブルを作成します。spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
次のように置き換えます。
- CONNECTOR_VERSION: Spanner コネクタのバージョン。
GitHub の
GoogleCloudDataproc/spark-spanner-connector
リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。
次のように出力されます。
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Spanner コネクタのバージョン。
GitHub の
Scala
クラスタで Scala サンプルコードを実行する手順は次のとおりです。
- SSH を使用して Dataproc クラスタ マスターノードに接続します。
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある
SSH
をクリックします。マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 事前にインストールされている
vi
、vim
、nano
テキスト エディタを使用して、マスターノードにsingers.scala
ファイルを作成します。- 次のコードを
singers.scala
ファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
次のように置き換えます。
- PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- INSTANCE_ID、DATABASE_ID、TABLE_NAME:
Singers
データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
singers.scala
ファイルを保存します。
- 次のコードを
spark-shell
REPL を起動します。$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
次のように置き換えます。
CONNECTOR_VERSION: Spanner コネクタのバージョン。GitHub の
GoogleCloudDataproc/spark-spanner-connector
リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。:load singers.scala
コマンドでsingers.scala
を実行して、Spanner のSingers
テーブルを作成します。出力リストには、Singers 出力の例が表示されます。> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Spanner グラフを読み取る
Spanner コネクタは、グラフを個別のノードとエッジの DataFrame にエクスポートするだけでなく、GraphFrames
に直接エクスポートすることもサポートしています。
次の例では、Spanner を GraphFrame
にエクスポートします。Spanner コネクタ jar に含まれている Python SpannerGraphConnector
クラスを使用して、Spanner Graph を読み取ります。
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
次のように置き換えます。
- CONNECTOR_VERSION: Spanner コネクタのバージョン。
GitHub の
GoogleCloudDataproc/spark-spanner-connector
リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。 - PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
- INSTANCE_ID、DATABASE_ID、TABLE_NAME インスタンス ID、データベース ID、グラフ ID を挿入します。
GraphFrames ではなくノードとエッジ DataFrames
をエクスポートするには、代わりに load_dfs
を使用します。
df_vertices, df_edges, df_id_map = connector.load_dfs()
クリーンアップ
Google Cloud アカウントに継続的に課金されないようにするには、Dataproc クラスタを停止または削除し、Spanner インスタンスを削除します。
次のステップ
pyspark.sql.DataFrame
の例を見る。- Spark DataFrame 言語のサポートについては、以下をご覧ください。
- GitHub の Spark Spanner Connector リポジトリを参照する。
- Spark ジョブ調整のヒントを確認する。