Halaman ini menunjukkan cara menggunakan Spark Spanner Connector untuk membaca data dari Spanner menggunakan Apache Spark
Menghitung biaya
Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:
- Dataproc
- Spanner
- Cloud Storage
Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda,
gunakan kalkulator harga.
Sebelum memulai
Sebelum menjalankan tutorial, pastikan Anda mengetahui versi konektor dan mendapatkan URI konektor.
Cara menentukan URI file JAR konektor
Versi konektor Spark Spanner tercantum di repositori GoogleCloudDataproc/spark-spanner-connector GitHub.
Tentukan file JAR konektor dengan mengganti informasi versi konektor dalam string URI berikut:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Konektor tersedia untuk versi Spark 3.1+
Contoh gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Menyiapkan database Spanner
Jika tidak memiliki tabel Spanner, Anda dapat mengikuti
tutorial untuk membuat
tabel Spanner. Setelah itu, Anda akan memiliki ID instance, ID database, dan tabel Singers
.
Membuat cluster Dataproc
Semua cluster Dataproc yang menggunakan konektor memerlukan cakupan spanner
atau cloud-platform
. Cluster Dataproc memiliki cakupan default cloud-platform
untuk gambar 2.1 atau yang lebih tinggi. Jika menggunakan versi lama, Anda dapat menggunakan Konsol Google Cloud, Google Cloud CLI, dan Dataproc API untuk membuat cluster Dataproc.
Konsol
- Di konsol Google Cloud, buka halaman Create a cluster pada Dataproc
- Di tab "Kelola keamanan", klik "Aktifkan cakupan cloud-platform untuk cluster ini" di bagian "Akses project".
- Selesaikan pengisian atau konfirmasi kolom pembuatan cluster lainnya, lalu klik "Buat".
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Anda dapat menentukan GceClusterConfig.serviceAccountScopes sebagai bagian dari permintaan clusters.create. Contoh:"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
Anda harus memastikan izin Spanner yang sesuai ditetapkan ke akun layanan VM Dataproc. Jika Anda menggunakan Peningkatan Data dalam tutorial, lihat Izin IAM Peningkatan Data
Membaca data dari Spanner
Anda dapat menggunakan Scala dan Python untuk membaca data dari Spanner ke dalam DataFrame Spark menggunakan API sumber data Spark.
Scala
- Periksa kode dan ganti placeholder [projectId], [instanceId], [databaseId], dan [table] dengan
project ID, ID instance, ID database, dan tabel yang Anda buat sebelumnya. Opsi enableDataBoost mengaktifkan fitur Data Boost Spanner, yang memiliki dampak hampir nol pada instance Spanner utama.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- Jalankan kode di cluster Anda
- Gunakan SSH untuk terhubung ke node master cluster Dataproc
- Buka halaman Clusters Dataproc di konsol Google Cloud, lalu klik nama cluster Anda
- Di halaman >Cluster details, pilih tab VM Instances. Lalu, klik
SSH
di sebelah kanan nama node master cluster
Jendela browser akan terbuka di direktori beranda Anda pada node masterConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Buat
singers.scala
dengan editor teksvi
,vim
, ataunano
yang telah terinstal, lalu tempelkan kode Scala dari listingan kode Scalanano singers.scala
- Luncurkan REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- Jalankan makers.scala dengan perintah
:load singers.scala
untuk membuat tabelSingers
Spanner. Listingan output menampilkan contoh dari output Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- Periksa kode dan ganti placeholder [projectId], [instanceId], [databaseId], dan [table] dengan
project ID, ID instance, ID database, dan tabel yang Anda buat sebelumnya. Opsi enableDataBoost mengaktifkan fitur Data Boost Spanner, yang memiliki dampak hampir nol pada instance Spanner utama.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- Jalankan kode pada cluster Anda
- Gunakan SSH untuk terhubung ke node master cluster Dataproc
- Buka halaman Clusters Dataproc di konsol Google Cloud, lalu klik nama cluster Anda
- Di halaman Cluster details, pilih tab VM Instances. Lalu, klik
SSH
di sebelah kanan nama node master cluster
Jendela browser akan terbuka di direktori beranda Anda pada node utamaConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Buat
singers.py
dengan editor teksvi
,vim
, ataunano
yang telah diinstal sebelumnya, lalu tempel kode PySpark dari listingan kode PySparknano singers.py
- Jalankan penyanyi.py dengan
spark-submit
untuk membuat tabelSingers
Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Output-nya adalah:... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- Gunakan SSH untuk terhubung ke node master cluster Dataproc
Pembersihan
Guna melakukan pembersihan dan menghindari timbulnya biaya berkelanjutan pada akun Google Cloud Anda untuk resource yang dibuat dalam panduan ini, ikuti langkah-langkah berikut.
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME