Menggunakan konektor Spanner dengan Spark

Halaman ini menunjukkan cara membuat cluster Dataproc yang menggunakan Spark Spanner Connector untuk membaca data dari Spanner menggunakan Apache Spark

Menghitung biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

  • Dataproc
  • Spanner
  • Cloud Storage

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Sebelum menggunakan konektor Spanner dalam tutorial ini, siapkan cluster Dataproc serta instance dan database Spanner.

Menyiapkan cluster Dataproc

Buat cluster Dataproc atau gunakan cluster Dataproc yang ada yang memiliki setelan berikut:

  • Izin akun layanan VM. Akun layanan VM kluster harus diberi izin Spanner yang sesuai. Jika Anda menggunakan Data Boost (Data Boost diaktifkan dalam kode contoh di Membaca data dari Spanner), akun layanan VM juga harus memiliki izin IAM Data Boost yang diperlukan.

  • Cakupan akses. Cluster harus dibuat dengan mengaktifkan cakupan cloud-platform atau cakupan spanner yang sesuai. Cakupan cloud-platform diaktifkan secara default untuk cluster yang dibuat dengan image versi 2.1 atau yang lebih tinggi.

    Petunjuk berikut menunjukkan cara menetapkan cakupan cloud-platform sebagai bagian dari permintaan pembuatan cluster yang menggunakan Konsol Google Cloud, gcloud CLI, atau Dataproc API. Untuk petunjuk pembuatan cluster tambahan, lihat Membuat cluster.

    Konsol Google Cloud

    1. Di konsol Google Cloud, buka halaman Dataproc Create a cluster.
    2. Di panel Kelola keamanan di bagian Akses project, klik "Memungkinkan cakupan cloud-platform untuk cluster ini".
    3. Isi atau konfirmasi kolom pembuatan cluster lainnya, lalu klik Create.

    gcloud CLI

    Anda dapat menjalankan perintah gcloud dataproc clusters create berikut untuk membuat cluster dengan cakupan cloud-platform yang diaktifkan.

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    Anda dapat menentukan GceClusterConfig.serviceAccountScopes sebagai bagian dari permintaan clusters.create.

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Menyiapkan instance Spanner dengan tabel database Singers

Buat instance Spanner dengan database yang berisi tabel Singers. Catat ID instance dan ID database Spanner.

Menggunakan konektor Spanner dengan Spark

Konektor Spanner tersedia untuk Spark versi 3.1+. Anda menentukan versi konektor sebagai bagian dari spesifikasi file JAR konektor Cloud Storage saat mengirimkan tugas ke cluster Dataproc.

Contoh: pengiriman tugas Spark gcloud CLI dengan konektor Spanner.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Ganti kode berikut:

CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.

Membaca data dari Spanner

Anda dapat menggunakan Python atau Scala untuk membaca data dari Spanner ke dalam Dataframe Spark menggunakan Spark data source API.

PySpark

Anda dapat menjalankan contoh kode PySpark di bagian ini di cluster dengan mengirimkan tugas ke layanan Dataproc atau dengan menjalankan tugas dari REPL spark-submit di node master cluster.

Tugas Dataproc

  1. Buat file singers.py menggunakan editor teks lokal atau di Cloud Shell menggunakan editor teks vi, vim, atau nano yang telah diinstal sebelumnya.
    1. Tempel kode berikut ke dalam file singers.py. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang memiliki dampak yang hampir tidak ada pada instance Spanner utama.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      Ganti kode berikut:

      1. PROJECT_ID: project ID Google Cloud Anda. Project ID tercantum di bagian Project info di Dasbor konsol Google Cloud.
      2. INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat Menyiapkan instance Spanner dengan tabel database Singers.
    2. Simpan file singers.py.
  2. Kirim tugas ke layanan Dataproc menggunakan Konsol Google Cloud, gcloud CLI, atau Dataproc API.

    Contoh: pengiriman tugas gcloud CLI dengan konektor Spanner.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    Ganti kode berikut:

    1. CLUSTER_NAME: Nama cluster baru.
    2. REGION: Region Compute Engine yang tersedia untuk menjalankan beban kerja.
    3. CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.

Tugas spark-submit

  1. Hubungkan ke node master cluster Dataproc menggunakan SSH.
    1. Buka halaman Clusters Dataproc di konsol Google Cloud, lalu klik nama cluster Anda.
    2. Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster.
      Halaman detail Cluster Dataproc di Cloud Console.

      Jendela browser akan terbuka di direktori beranda Anda di node master.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Buat file singers.py di node master menggunakan editor teks vi, vim, atau nano yang telah diinstal sebelumnya.
    1. Tempel kode berikut ke dalam file singers.py. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang memiliki dampak yang hampir tidak ada pada instance Spanner utama.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Ganti kode berikut:

      1. PROJECT_ID: project ID Google Cloud Anda. Project ID tercantum di bagian Project info di Dasbor konsol Google Cloud.
      2. INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat Menyiapkan instance Spanner dengan tabel database Singers.
    2. Simpan file singers.py.
  3. Jalankan singers.py dengan spark-submit untuk membuat tabel Singers Spanner.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Ganti kode berikut:

    1. CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.

    Output-nya adalah:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

Untuk menjalankan contoh kode Scala di cluster Anda, selesaikan langkah-langkah berikut:

  1. Hubungkan ke node master cluster Dataproc menggunakan SSH.
    1. Buka halaman Clusters Dataproc di konsol Google Cloud, lalu klik nama cluster Anda.
    2. Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik SSH di sebelah kanan nama node master cluster.
      Halaman detail Cluster Dataproc di Cloud Console.

      Jendela browser akan terbuka di direktori beranda Anda di node master.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Buat file singers.scala di node master menggunakan editor teks vi, vim, atau nano yang telah diinstal sebelumnya.
    1. Tempel kode berikut ke dalam file singers.scala. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang memiliki dampak hampir nol pada instance Spanner utama.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Ganti kode berikut:

      1. PROJECT_ID: project ID Google Cloud Anda. Project ID tercantum di bagian Project info di Dasbor konsol Google Cloud.
      2. INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat Menyiapkan instance Spanner dengan tabel database Singers.
    2. Simpan file singers.scala.
  3. Luncurkan REPL spark-shell.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Ganti kode berikut:

    CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub GoogleCloudDataproc/spark-spanner-connector.

  4. Jalankan singers.scala dengan perintah :load singers.scala untuk membuat tabel Singers Spanner. Listingan output menampilkan contoh dari output Singers.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Pembersihan

Agar tidak menimbulkan biaya berkelanjutan pada akun Google Cloud , Anda dapat menghentikan atau menghapus cluster Dataproc dan menghapus instance Spanner.

Untuk informasi selengkapnya