Menggunakan konektor Spanner dengan Spark

Halaman ini menunjukkan cara menggunakan Spark Spanner Connector untuk membaca data dari Spanner menggunakan Apache Spark

Menghitung biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Dataproc
  • Spanner
  • Cloud Storage

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Sebelum menjalankan tutorial, pastikan Anda mengetahui versi konektor dan mendapatkan URI konektor.

Cara menentukan URI file JAR konektor

Versi konektor Spark Spanner tercantum di repositori GoogleCloudDataproc/spark-spanner-connector GitHub.

Tentukan file JAR konektor dengan mengganti informasi versi konektor dalam string URI berikut:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

Konektor tersedia untuk versi Spark 3.1+

Contoh gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Menyiapkan database Spanner

Jika tidak memiliki tabel Spanner, Anda dapat mengikuti tutorial untuk membuat tabel Spanner. Setelah itu, Anda akan memiliki ID instance, ID database, dan tabel Singers.

Membuat cluster Dataproc

Semua cluster Dataproc yang menggunakan konektor memerlukan cakupan spanner atau cloud-platform. Cluster Dataproc memiliki cakupan default cloud-platform untuk gambar 2.1 atau yang lebih tinggi. Jika menggunakan versi lama, Anda dapat menggunakan Konsol Google Cloud, Google Cloud CLI, dan Dataproc API untuk membuat cluster Dataproc.

Konsol

  1. Di konsol Google Cloud, buka halaman Create a cluster pada Dataproc
  2. Di tab "Kelola keamanan", klik "Aktifkan cakupan cloud-platform untuk cluster ini" di bagian "Akses project".
  3. Selesaikan pengisian atau konfirmasi kolom pembuatan cluster lainnya, lalu klik "Buat".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Anda dapat menentukan GceClusterConfig.serviceAccountScopes sebagai bagian dari permintaan clusters.create. Contoh:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Anda harus memastikan izin Spanner yang sesuai ditetapkan ke akun layanan VM Dataproc. Jika Anda menggunakan Peningkatan Data dalam tutorial, lihat Izin IAM Peningkatan Data

Membaca data dari Spanner

Anda dapat menggunakan Scala dan Python untuk membaca data dari Spanner ke dalam DataFrame Spark menggunakan API sumber data Spark.

Scala

  1. Periksa kode dan ganti placeholder [projectId], [instanceId], [databaseId], dan [table] dengan project ID, ID instance, ID database, dan tabel yang Anda buat sebelumnya. Opsi enableDataBoost mengaktifkan fitur Data Boost Spanner, yang memiliki dampak hampir nol pada instance Spanner utama.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
    
    
  2. Jalankan kode di cluster Anda
    1. Gunakan SSH untuk terhubung ke node master cluster Dataproc
      1. Buka halaman Clusters Dataproc di konsol Google Cloud, lalu klik nama cluster Anda
        Halaman cluster Dataproc di Konsol Cloud.
      2. Di halaman >Cluster details, pilih tab VM Instances. Lalu, klik SSH di sebelah kanan nama node master cluster
        Halaman detail Cluster Dataproc di konsol Cloud.

        Jendela browser akan terbuka di direktori beranda Anda pada node master
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Buat singers.scala dengan editor teks vi, vim, atau nano yang telah terinstal, lalu tempelkan kode Scala dari listingan kode Scala
      nano singers.scala
        
    3. Luncurkan REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Jalankan makers.scala dengan perintah :load singers.scala untuk membuat tabel Singers Spanner. Listingan output menampilkan contoh dari output Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Periksa kode dan ganti placeholder [projectId], [instanceId], [databaseId], dan [table] dengan project ID, ID instance, ID database, dan tabel yang Anda buat sebelumnya. Opsi enableDataBoost mengaktifkan fitur Data Boost Spanner, yang memiliki dampak hampir nol pada instance Spanner utama.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
    
    
  2. Jalankan kode pada cluster Anda
    1. Gunakan SSH untuk terhubung ke node master cluster Dataproc
      1. Buka halaman Clusters Dataproc di konsol Google Cloud, lalu klik nama cluster Anda
        Halaman cluster di Konsol Cloud.
      2. Di halaman Cluster details, pilih tab VM Instances. Lalu, klik SSH di sebelah kanan nama node master cluster
        Pilih SSH pada baris nama cluster pada halaman detail Cluster di Cloud Console.

        Jendela browser akan terbuka di direktori beranda Anda pada node utama
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Buat singers.py dengan editor teks vi, vim, atau nano yang telah diinstal sebelumnya, lalu tempel kode PySpark dari listingan kode PySpark
      nano singers.py
      
    3. Jalankan penyanyi.py dengan spark-submit untuk membuat tabel Singers Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      Output-nya adalah:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Pembersihan

Guna melakukan pembersihan dan menghindari timbulnya biaya berkelanjutan pada akun Google Cloud Anda untuk resource yang dibuat dalam panduan ini, ikuti langkah-langkah berikut.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Untuk informasi selengkapnya