Spark Spanner コネクタを使用する

このページでは、Spark Spanner コネクタを使用して、Apache SparkSpanner からデータを読み取る Dataproc クラスタを作成する方法について説明します。

Spanner コネクタは Spark と連携して、Spanner Java ライブラリを使用して Spanner データベースからデータを読み取ります。Spanner コネクタは、Spanner のテーブルグラフを Spark の DataFrameGraphFrame に読み込むことをサポートしています。

費用

このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。

  • Dataproc
  • Spanner
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

このチュートリアルで Spanner コネクタを使用する前に、Dataproc クラスタSpanner インスタンスとデータベースを設定します。

Dataproc クラスタを設定する

Dataproc クラスタを作成するか、次の設定を持つ既存の Dataproc クラスタを使用します。

  • VM サービス アカウントの権限。クラスタの VM サービス アカウントには、適切な Spanner 権限を割り当てる必要があります。Data Boost を使用する場合(Spanner テーブルをエクスポートするのサンプルコードでは Data Boost が有効になっています)、VM サービス アカウントに必要な Data Boost IAM 権限も付与されている必要があります。

  • アクセス スコープ。クラスタは、cloud-platform スコープまたは適切な spanner スコープを有効にして作成する必要があります。イメージ バージョン 2.1 以降で作成されたクラスタでは、cloud-platform スコープがデフォルトで有効になっています。

    次の手順では、 Google Cloud コンソール、gcloud CLI、または Dataproc API を使用するクラスタ作成リクエストの一部として cloud-platform スコープを設定する方法について説明します。クラスタの作成手順については、クラスタを作成するをご覧ください。

    Google Cloud コンソール

    1. Google Cloud コンソールで、Dataproc の [クラスタの作成] ページを開きます。
    2. [セキュリティ管理] パネルの [プロジェクト アクセス] セクションで、[このクラスタのクラウド プラットフォーム スコープを有効にする] をクリックします。
    3. クラスタ作成の他のフィールドに入力するか、確認して、[作成] をクリックします。

    gcloud CLI

    次の gcloud dataproc clusters create コマンドを実行して、cloud-platform スコープが有効なクラスタを作成できます。

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    clusters.create リクエストの一部として GceClusterConfig.serviceAccountScopes を指定できます。

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Singers データベース テーブルを使用して Spanner インスタンスを設定する

Singers テーブルを含むデータベースを使用して Spanner インスタンスを作成します。Spanner インスタンス ID とデータベース ID をメモします。

Spark で Spanner コネクタを使用する

Spanner コネクタは、Spark バージョン 3.1+ で使用できます。ジョブを送信するときに、Cloud Storage コネクタ JAR ファイル仕様の一部としてコネクタ バージョンを指定します。

例: Spanner コネクタを使用した gcloud CLI Spark ジョブの送信。

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

次のように置き換えます。

CONNECTOR_VERSION: Spanner コネクタのバージョン。GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。

Spanner テーブルを読み取る

Python または Scala を使用して、Spark データソース API を使用して Spanner テーブルデータを Spark Dataframe に読み取ることが可能です。

PySpark

このセクションの PySpark コードの例をクラスタで実行するには、Dataproc サービスにジョブを送信するか、クラスタ マスターノードの spark-submit REPL からジョブを実行します。

Dataproc ジョブ

  1. ローカル テキスト エディタを使用するか、Cloud Shell でプリインストールされている vivim、または nano テキスト エディタを使用して singers.py ファイルを作成します。
    1. プレースホルダ変数に値を入力したら、次のコードを singers.py ファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      次のように置き換えます。

      1. PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME: Singers データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
    2. singers.py ファイルを保存します。
  2. Google Cloud コンソール、gcloud CLI、または Dataproc API を使用して、Dataproc サービスにジョブを送信します。

    例: Spanner コネクタを使用した gcloud CLI ジョブの送信。

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    次のように置き換えます。

    1. CLUSTER_NAME: 新しいクラスタの名前。
    2. REGION: ワークロードを実行できる利用可能な Compute Engine リージョン
    3. CONNECTOR_VERSION: Spanner コネクタのバージョン。 GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。

spark-submit ジョブ

  1. SSH を使用して Dataproc クラスタ マスターノードに接続します。
    1. Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
    2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある SSH をクリックします。
       Google Cloud コンソールの Dataproc クラスタの詳細ページのスクリーンショット。クラスタ マスターノードに接続するために使用される SSH ボタンが表示されています。

      マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 事前にインストールされている vivimnano テキスト エディタを使用して、マスターノードに singers.py ファイルを作成します。
    1. 次のコードを singers.py ファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      次のように置き換えます。

      1. PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME: Singers データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
    2. singers.py ファイルを保存します。
  3. spark-submitsingers.py を実行して、Spanner の Singers テーブルを作成します。
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    次のように置き換えます。

    1. CONNECTOR_VERSION: Spanner コネクタのバージョン。 GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。

    次のように出力されます。

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

クラスタで Scala サンプルコードを実行する手順は次のとおりです。

  1. SSH を使用して Dataproc クラスタ マスターノードに接続します。
    1. Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタ名をクリックします。
    2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスターノード名の右側にある SSH をクリックします。 Google Cloud コンソールの Dataproc の [クラスタの詳細] ページ。

      マスターノード上のホーム ディレクトリでブラウザ ウィンドウが開きます。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 事前にインストールされている vivimnano テキスト エディタを使用して、マスターノードに singers.scala ファイルを作成します。
    1. 次のコードを singers.scala ファイルに貼り付けます。Spanner の Data Boost 機能が有効になっています。これはメインの Spanner インスタンスへの影響がほぼゼロです。
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      次のように置き換えます。

      1. PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME: Singers データベース テーブルを使用して Spanner インスタンスを設定するをご覧ください。
    2. singers.scala ファイルを保存します。
  3. spark-shell REPL を起動します。
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    次のように置き換えます。

    CONNECTOR_VERSION: Spanner コネクタのバージョン。GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。

  4. :load singers.scala コマンドで singers.scala を実行して、Spanner の Singers テーブルを作成します。出力リストには、Singers 出力の例が表示されます。
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Spanner グラフを読み取る

Spanner コネクタは、グラフを個別のノードとエッジの DataFrame にエクスポートするだけでなく、GraphFrames に直接エクスポートすることもサポートしています。

次の例では、Spanner を GraphFrame にエクスポートします。Spanner コネクタ jar に含まれている Python SpannerGraphConnector クラスを使用して、Spanner Graph を読み取ります。

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

次のように置き換えます。

  • CONNECTOR_VERSION: Spanner コネクタのバージョン。 GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリのバージョン リストから Spanner コネクタのバージョンを選択します。
  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。 プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
  • INSTANCE_IDDATABASE_IDTABLE_NAME インスタンス ID、データベース ID、グラフ ID を挿入します。

GraphFrames ではなくノードとエッジ DataFrames をエクスポートするには、代わりに load_dfs を使用します。

df_vertices, df_edges, df_id_map = connector.load_dfs()

クリーンアップ

Google Cloud アカウントに継続的に課金されないようにするには、Dataproc クラスタを停止または削除し、Spanner インスタンスを削除します。

次のステップ