Menggunakan konektor Spanner dengan Spark

Halaman ini menunjukkan cara menggunakan Konektor Spanner Spark untuk membaca data dari Spanner menggunakan Apache Spark

Menghitung biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Dataproc
  • Spanner
  • Cloud Storage

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Sebelum menjalankan tutorial, pastikan Anda mengetahui versi konektor dan mendapatkan URI konektor.

Cara menentukan URI file JAR konektor

Versi konektor Spark Spanner tercantum di repositori GoogleCloudDataproc/spark-spanner-connector GitHub.

Tentukan file JAR konektor dengan mengganti informasi versi konektor dalam string URI berikut:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

Konektor tersedia untuk Spark versi 3.1+

Contoh gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Menyiapkan database Spanner

Jika tidak memiliki tabel Spanner, Anda dapat mengikuti tutorial untuk membuat tabel Spanner. Setelah itu, Anda akan memiliki ID instance, ID database, dan Singers tabel.

Membuat cluster Dataproc

Setiap cluster Dataproc yang menggunakan konektor memerlukan cakupan spanner atau cloud-platform. Cluster Dataproc memiliki cakupan default cloud-platform untuk image 2.1 atau yang lebih tinggi. Jika menggunakan versi lama, Anda dapat menggunakan konsol Google Cloud, Google Cloud CLI, dan Dataproc API untuk membuat cluster Dataproc.

Konsol

  1. Di konsol Google Cloud, buka halaman Buat cluster Dataproc
  2. Di tab "Kelola keamanan", klik "Memungkinkan cakupan cloud-platform untuk cluster ini" di bagian "Akses project".
  3. Selesaikan pengisian atau konfirmasi kolom pembuatan cluster lainnya, lalu klik "Buat".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Anda dapat menentukan GceClusterConfig.serviceAccountScopes sebagai bagian dari permintaan clusters.create. Contoh:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Anda harus memastikan izin Spanner yang sesuai ditetapkan ke akun layanan VM Dataproc. Jika Anda menggunakan Data Boost dalam tutorial, lihat Izin IAM Data Boost

Membaca data dari Spanner

Anda dapat menggunakan Scala dan Python untuk membaca data dari Spanner ke dalam Dataframe Spark menggunakan Spark data source API.

Scala

  1. Periksa kode dan ganti placeholder [projectId], [instanceId], [databaseId], dan [table] dengan project ID, instance ID, database ID, dan tabel yang Anda buat sebelumnya. Opsi enableDataBoost mengaktifkan fitur Data Boost Spanner, yang memiliki dampak yang hampir nol pada instance Spanner utama.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. Menjalankan kode di cluster
    1. Gunakan SSH untuk terhubung ke node master cluster Dataproc
      1. Buka halaman Cluster Dataproc di konsol Google Cloud, lalu klik nama cluster Anda
        Halaman cluster Dataproc di Cloud Console.
      2. Di halaman >Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster
        Halaman detail Cluster Dataproc di Cloud Console.

        Jendela browser akan terbuka di direktori utama Anda di node master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Buat singers.scala dengan editor teks vi, vim, atau nano bawaan, lalu tempel kode Scala dari listingan kode Scala
      nano singers.scala
        
    3. Luncurkan REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Jalankan singers.scala dengan perintah :load singers.scala untuk membuat tabel Singers Spanner. Listingan output menampilkan contoh dari output Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Periksa kode dan ganti placeholder [projectId], [instanceId], [databaseId], dan [table] dengan project ID, instance ID, database ID, dan tabel yang Anda buat sebelumnya. Opsi enableDataBoost mengaktifkan fitur Data Boost Spanner, yang memiliki dampak yang hampir nol pada instance Spanner utama.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. Jalankan kode di cluster Anda
    1. Gunakan SSH untuk terhubung ke node master cluster Dataproc
      1. Buka halaman Cluster Dataproc di konsol Google Cloud, lalu klik nama cluster Anda
        Halaman Cluster di konsol Cloud.
      2. Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster
        Pilih SSH di baris nama cluster pada halaman Detail cluster di konsol Cloud.

        Jendela browser akan terbuka di direktori utama Anda di node utama
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Buat singers.py dengan editor teks vi, vim, atau nano yang telah diinstal sebelumnya, lalu tempel kode PySpark dari listingan kode PySpark
      nano singers.py
      
    3. Jalankan singers.py dengan spark-submit untuk membuat tabel Singers Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      Output-nya adalah:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Pembersihan

Untuk membersihkan dan menghindari tagihan berkelanjutan ke akun Google Cloud Anda untuk resource yang dibuat dalam panduan ini, ikuti langkah-langkah berikut.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Untuk informasi selengkapnya