Menggunakan konektor Spark Spanner

Halaman ini menunjukkan cara membuat cluster Dataproc yang menggunakan konektor Spark Spanner untuk membaca data dari Spanner menggunakan Apache Spark

Konektor Spanner berfungsi dengan Spark untuk membaca data dari database Spanner menggunakan library Java Spanner. Konektor Spanner mendukung pembacaan tabel dan grafik Spanner ke dalam DataFrames dan GraphFrames Spark.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

  • Dataproc
  • Spanner
  • Cloud Storage

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Sebelum menggunakan konektor Spanner dalam tutorial ini, siapkan cluster Dataproc dan instance dan database Spanner.

Siapkan cluster Dataproc

Buat cluster Dataproc atau gunakan cluster Dataproc yang ada dengan setelan berikut:

  • Izin akun layanan VM. Akun layanan VM cluster harus diberi izin Spanner yang sesuai. Jika Anda menggunakan Data Boost (Data Boost diaktifkan dalam contoh kode di Mengekspor tabel Spanner), akun layanan VM juga harus memiliki izin IAM Data Boost yang diperlukan.

  • Cakupan akses. Cluster harus dibuat dengan mengaktifkan cakupan cloud-platform atau cakupan spanner yang sesuai. Cakupan cloud-platform diaktifkan secara default untuk cluster yang dibuat dengan versi image 2.1 atau yang lebih tinggi.

    Petunjuk berikut menunjukkan cara menetapkan cakupan cloud-platform sebagai bagian dari permintaan pembuatan cluster yang menggunakan konsol Google Cloud , gcloud CLI, atau Dataproc API. Untuk petunjuk pembuatan cluster tambahan, lihat Membuat cluster.

    Google Cloud console

    1. Di konsol Google Cloud , buka halaman Dataproc Create a cluster.
    2. Di panel Kelola keamanan di bagian Akses project, klik "Memungkinkan cakupan cloud-platform untuk cluster ini".
    3. Isi atau konfirmasi kolom pembuatan cluster lainnya, lalu klik Buat.

    gcloud CLI

    Anda dapat menjalankan perintah gcloud dataproc clusters create berikut untuk membuat cluster dengan cakupan cloud-platform yang diaktifkan.

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    Anda dapat menentukan GceClusterConfig.serviceAccountScopes sebagai bagian dari permintaan clusters.create.

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Menyiapkan instance Spanner dengan tabel database Singers

Buat instance Spanner dengan database yang berisi tabel Singers. Catat ID instance Spanner dan ID database.

Menggunakan konektor Spanner dengan Spark

Konektor Spanner tersedia untuk Spark versi 3.1+. Anda menentukan versi konektor sebagai bagian dari spesifikasi file JAR konektor Cloud Storage saat Anda mengirimkan tugas ke cluster Dataproc.

Contoh: Pengiriman tugas Spark gcloud CLI dengan konektor Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Ganti kode berikut:

CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.

Membaca tabel Spanner

Anda dapat menggunakan Python atau Scala untuk membaca data tabel Spanner ke dalam DataFrame Spark menggunakan Spark Data Source API.

PySpark

Anda dapat menjalankan contoh kode PySpark di bagian ini pada cluster dengan mengirimkan tugas ke layanan Dataproc atau dengan menjalankan tugas dari REPL spark-submit di node master cluster.

Tugas Dataproc

  1. Buat file singers.py menggunakan editor teks lokal atau di Cloud Shell menggunakan editor teks vi, vim, atau nano yang telah diinstal sebelumnya.
    1. Setelah mengisi variabel placeholder, tempelkan kode berikut ke dalam file singers.py. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang hampir tidak berdampak pada instance Spanner utama.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Ganti kode berikut:

      1. PROJECT_ID: ID project Google Cloud Anda. Project ID tercantum di bagian Project info di Google Cloud Dasbor konsol.
      2. INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat Menyiapkan instance Spanner dengan tabel database Singers.
    2. Simpan file singers.py.
  2. Kirimkan tugas ke layanan Dataproc menggunakan konsol Google Cloud , gcloud CLI, atau Dataproc API.

    Contoh: Pengiriman tugas gcloud CLI dengan konektor Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    Ganti kode berikut:

    1. CLUSTER_NAME: Nama cluster baru.
    2. REGION: Region Compute Engine yang tersedia untuk menjalankan beban kerja.
    3. CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.

Tugas spark-submit

  1. Hubungkan ke node master cluster Dataproc menggunakan SSH.
    1. Buka halaman Clusters Dataproc di konsol Google Cloud , lalu klik nama cluster Anda.
    2. Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster.
      Screenshot halaman detail Cluster Dataproc di konsol Google Cloud , yang menampilkan tombol SSH yang digunakan untuk terhubung ke node master cluster.

      Jendela browser akan terbuka di direktori beranda Anda di node master.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Buat file singers.py di node master menggunakan editor teks vi, vim, atau nano yang telah diinstal sebelumnya.
    1. Tempelkan kode berikut ke dalam file singers.py. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang hampir tidak berdampak pada instance Spanner utama.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Ganti kode berikut:

      1. PROJECT_ID: ID project Google Cloud Anda. Project ID tercantum di bagian Project info di Google Cloud Dasbor konsol.
      2. INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat Menyiapkan instance Spanner dengan tabel database Singers.
    2. Simpan file singers.py.
  3. Jalankan singers.py dengan spark-submit untuk membuat tabel Singers Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Ganti kode berikut:

    1. CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.

    Outputnya adalah:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Untuk menjalankan contoh kode Scala di cluster Anda, selesaikan langkah-langkah berikut:

  1. Hubungkan ke node master cluster Dataproc menggunakan SSH.
    1. Buka halaman Clusters Dataproc di konsol Google Cloud , lalu klik nama cluster Anda.
    2. Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster. Halaman detail Cluster Dataproc di konsol Google Cloud .

      Jendela browser akan terbuka di direktori beranda Anda di node master.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Buat file singers.scala di node master menggunakan editor teks vi, vim, atau nano yang telah diinstal sebelumnya.
    1. Tempelkan kode berikut ke dalam file singers.scala. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang hampir tidak berdampak pada instance Spanner utama.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Ganti kode berikut:

      1. PROJECT_ID: ID project Google Cloud Anda. Project ID tercantum di bagian Project info di Google Cloud Dasbor konsol.
      2. INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat Menyiapkan instance Spanner dengan tabel database Singers.
    2. Simpan file singers.scala.
  3. Luncurkan REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Ganti kode berikut:

    CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Jalankan singers.scala dengan perintah :load singers.scala untuk membuat tabel Singers Spanner. Output listing menampilkan contoh dari output Penyanyi.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Membaca grafik Spanner

Konektor Spanner mendukung ekspor grafik ke dalam DataFrames node dan edge terpisah, serta ekspor langsung ke GraphFrames.

Contoh berikut mengekspor Spanner ke GraphFrame. Proses ini menggunakan class SpannerGraphConnector Python, yang disertakan dalam jar konektor Spanner, untuk membaca Spanner Graph.

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

Ganti kode berikut:

  • CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.
  • PROJECT_ID: ID project Google Cloud Anda. Project ID tercantum di bagian Project info di Google Cloud Dasbor konsol.
  • INSTANCE_ID, DATABASE_ID, dan TABLE_NAME Insert the instance, database, and graph IDs.

Untuk mengekspor node dan tepi DataFrames, bukan GraphFrames, gunakan load_dfs sebagai gantinya:

df_vertices, df_edges, df_id_map = connector.load_dfs()

Pembersihan

Agar tidak menimbulkan biaya berkelanjutan pada akun Google Cloud Anda, Anda dapat menghentikan atau menghapus cluster Dataproc dan menghapus instance Spanner.

Langkah berikutnya