Spark で Spanner コネクタを使用する

このページでは、Spark Spanner コネクタを使用して、Apache SparkSpanner からデータを読み取る方法について説明します。

費用の計算

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Dataproc
  • Spanner
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

チュートリアルを実行する前に、コネクタのバージョンを確認し、コネクタ URI を取得します。

コネクタ JAR ファイルの URI を指定する方法

Spark Spanner コネクタのバージョンは、GitHub の GoogleCloudDataproc/spark-spanner-connector リポジトリに記載されています。

コネクタ JAR ファイルは、次の URI 文字列のコネクタ バージョン情報を置き換えて指定します。

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

このコネクタは Spark バージョン 3.1+ で使用できます。

gcloud CLI の例:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Spanner データベースを準備する

Spanner テーブルがない場合は、チュートリアルに沿って Spanner テーブルを作成できます。その後、インスタンス ID、データベース ID、テーブル Singers が作成されます。

Dataproc クラスタの作成

コネクタを使用する Dataproc クラスタには、spanner または cloud-platformスコープが必要です。Dataproc クラスタのデフォルトのスコープは、イメージ 2.1 以降では cloud-platform です。以前のバージョンを使用している場合は、Google Cloud コンソール、Google Cloud CLI、Dataproc API を使用して Dataproc クラスタを作成できます。

Console

  1. Google Cloud コンソールで、Dataproc の [クラスタの作成] ページを開きます。
  2. [セキュリティの管理] タブで、[プロジェクト アクセス] セクションの [このクラスタのクラウド プラットフォーム スコープを有効にします] をクリックします。
  3. クラスタ作成の他のフィールドへの入力および確認を行い、[作成] をクリックします。

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

GceClusterConfig.serviceAccountScopes は、clusters.create リクエストの一部として指定できます。次に例を示します。
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

対応する Spanner 権限Dataproc VM サービス アカウントに割り当てられていることを確認する必要があります。このチュートリアルで Data Boost を使用する場合は、Data Boost IAM 権限をご覧ください。

Spanner からデータを読み取る

Scala と Python を使用すると、Spark データソース API を使用して Spanner から Spark Dataframe までのデータを読み取ることができます。

Scala

  1. コードを確認し、[projectId][instanceId][databaseId][table] のプレースホルダを、前に作成したプロジェクト ID、インスタンス ID、データベース ID、テーブルに置き換えます。enableDataBoost オプションを使用すると、Spanner の Data Boost 機能が有効になります。これはメインの Spanner インスタンスへの影響がほぼゼロです。
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. クラスタ上でコードを実行する
    1. SSH を使用して Dataproc クラスタ マスター ノードに接続します。
      1. Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
        Cloud コンソールの Dataproc [クラスタ] ページ。
      2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタのマスターノード名の右側にある SSH をクリックします。
        Cloud コンソールの Dataproc [クラスタの詳細] ページ。

        マスターノードのホーム ディレクトリでブラウザ ウィンドウが開きます。
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 事前にインストールされた vivimnano テキスト エディタで singers.scala を作成し、Scala コードの一覧から Scala コードを貼り付けます。
      nano singers.scala
        
    3. spark-shell REPL を起動します。
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. :load singers.scala コマンドで singers.scala を実行して、Spanner の Singers テーブルを作成します。出力リストには、Singers 出力の例が表示されます。
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. コードを確認し、[projectId][instanceId][databaseId][table] のプレースホルダを、前に作成したプロジェクト ID、インスタンス ID、データベース ID、テーブルに置き換えます。enableDataBoost オプションを使用すると、Spanner の Data Boost 機能が有効になります。これはメインの Spanner インスタンスへの影響がほぼゼロです。
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. クラスタ上でコードを実行する
    1. SSH を使用して Dataproc クラスタ マスター ノードに接続します。
      1. Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
        Cloud コンソールの [クラスタ] ページ。
      2. [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスター ノード名の右側にある SSH をクリックします。
        Cloud コンソールの [クラスタの詳細] ページで、クラスタ名の行の [SSH] を選択します。

        プライマリ ノードのホーム ディレクトリでブラウザ ウィンドウが開きます。
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 事前にインストールされた vivim、または nano テキスト エディタで singers.py を作成し、PySpark のコード一覧から PySpark コードを貼り付けます。
      nano singers.py
      
    3. spark-submit で singers.py を実行し、Spanner の Singers テーブルを作成します。
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      次のように出力されます。
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

クリーンアップ

このチュートリアルで作成したリソースについて、Google Cloud アカウントへの継続的な課金を回避する手順は、次のとおりです。

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

自然言語処理についてや、