Halaman ini menunjukkan cara membuat cluster Dataproc yang menggunakan Spark Spanner Connector untuk membaca data dari Spanner menggunakan Apache Spark
Menghitung biaya
Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:
- Dataproc
- Spanner
- Cloud Storage
Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda,
gunakan kalkulator harga.
Sebelum memulai
Sebelum menggunakan konektor Spanner dalam tutorial ini, siapkan cluster Dataproc serta instance dan database Spanner.
Menyiapkan cluster Dataproc
Buat cluster Dataproc atau gunakan cluster Dataproc yang ada yang memiliki setelan berikut:
Izin akun layanan VM. Akun layanan VM kluster harus diberi izin Spanner yang sesuai. Jika Anda menggunakan Data Boost (Data Boost diaktifkan dalam kode contoh di Membaca data dari Spanner), akun layanan VM juga harus memiliki izin IAM Data Boost yang diperlukan.
Cakupan akses. Cluster harus dibuat dengan mengaktifkan cakupan
cloud-platform
atau cakupanspanner
yang sesuai. Cakupancloud-platform
diaktifkan secara default untuk cluster yang dibuat dengan image versi 2.1 atau yang lebih tinggi.Petunjuk berikut menunjukkan cara menetapkan cakupan
cloud-platform
sebagai bagian dari permintaan pembuatan cluster yang menggunakan Konsol Google Cloud, gcloud CLI, atau Dataproc API. Untuk petunjuk pembuatan cluster tambahan, lihat Membuat cluster.Konsol Google Cloud
- Di konsol Google Cloud, buka halaman Dataproc Create a cluster.
- Di panel Kelola keamanan di bagian Akses project, klik "Memungkinkan cakupan cloud-platform untuk cluster ini".
- Isi atau konfirmasi kolom pembuatan cluster lainnya, lalu klik Create.
gcloud CLI
Anda dapat menjalankan perintah
gcloud dataproc clusters create
berikut untuk membuat cluster dengan cakupancloud-platform
yang diaktifkan.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Anda dapat menentukan GceClusterConfig.serviceAccountScopes sebagai bagian dari permintaan clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Menyiapkan instance Spanner dengan tabel database Singers
Buat instance Spanner
dengan database yang berisi tabel Singers
. Catat ID instance
dan ID database Spanner.
Menggunakan konektor Spanner dengan Spark
Konektor Spanner tersedia untuk Spark versi 3.1+
.
Anda menentukan versi konektor sebagai bagian dari spesifikasi file JAR konektor Cloud Storage saat mengirimkan tugas ke cluster Dataproc.
Contoh: pengiriman tugas Spark gcloud CLI dengan konektor Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Ganti kode berikut:
CONNECTOR_VERSION: Versi konektor Spanner.
Pilih versi konektor Spanner dari daftar versi di repositori GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Membaca data dari Spanner
Anda dapat menggunakan Python atau Scala untuk membaca data dari Spanner ke dalam Dataframe Spark menggunakan Spark data source API.
PySpark
Anda dapat menjalankan contoh kode PySpark di bagian ini di cluster dengan mengirimkan tugas ke
layanan Dataproc atau dengan menjalankan tugas dari REPL spark-submit
di node master cluster.
Tugas Dataproc
- Buat file
singers.py
menggunakan editor teks lokal atau di Cloud Shell menggunakan editor teksvi
,vim
, ataunano
yang telah diinstal sebelumnya. - Tempel kode berikut ke dalam file
singers.py
. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang memiliki dampak yang hampir tidak ada pada instance Spanner utama.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Ganti kode berikut:
- PROJECT_ID: project ID Google Cloud Anda. Project ID tercantum di bagian Project info di Dasbor konsol Google Cloud.
- INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat
Menyiapkan instance Spanner dengan tabel database
Singers
.
- Simpan file
singers.py
. - Kirim tugas
ke layanan Dataproc menggunakan Konsol Google Cloud, gcloud CLI, atau
Dataproc API.
Contoh: pengiriman tugas gcloud CLI dengan konektor Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
Ganti kode berikut:
- CLUSTER_NAME: Nama cluster baru.
- REGION: Region Compute Engine yang tersedia untuk menjalankan beban kerja.
- CONNECTOR_VERSION: Versi konektor Spanner.
Pilih versi konektor Spanner dari daftar versi di repositori GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Tugas spark-submit
- Hubungkan ke node master cluster Dataproc menggunakan SSH.
- Buka halaman Clusters Dataproc di konsol Google Cloud, lalu klik nama cluster Anda.
- Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik
SSH
di sebelah kanan nama node master cluster.Jendela browser akan terbuka di direktori beranda Anda di node master.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Buat file
singers.py
di node master menggunakan editor teksvi
,vim
, ataunano
yang telah diinstal sebelumnya.- Tempel kode berikut ke dalam file
singers.py
. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang memiliki dampak yang hampir tidak ada pada instance Spanner utama.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Ganti kode berikut:
- PROJECT_ID: project ID Google Cloud Anda. Project ID tercantum di bagian Project info di Dasbor konsol Google Cloud.
- INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat
Menyiapkan instance Spanner dengan tabel database
Singers
.
- Simpan file
singers.py
.
- Tempel kode berikut ke dalam file
- Jalankan
singers.py
denganspark-submit
untuk membuat tabelSingers
Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Ganti kode berikut:
- CONNECTOR_VERSION: Versi konektor Spanner.
Pilih versi konektor Spanner dari daftar versi di repositori GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Output-nya adalah:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Versi konektor Spanner.
Pilih versi konektor Spanner dari daftar versi di repositori GitHub
Scala
Untuk menjalankan contoh kode Scala di cluster Anda, selesaikan langkah-langkah berikut:
- Hubungkan ke node master cluster Dataproc menggunakan SSH.
- Buka halaman Clusters Dataproc di konsol Google Cloud, lalu klik nama cluster Anda.
- Di halaman Cluster details, pilih tab VM Instances. Kemudian, klik
SSH
di sebelah kanan nama node master cluster.Jendela browser akan terbuka di direktori beranda Anda di node master.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Buat file
singers.scala
di node master menggunakan editor teksvi
,vim
, ataunano
yang telah diinstal sebelumnya.- Tempel kode berikut ke dalam file
singers.scala
. Perhatikan bahwa fitur Data Boost Spanner diaktifkan, yang memiliki dampak hampir nol pada instance Spanner utama.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Ganti kode berikut:
- PROJECT_ID: project ID Google Cloud Anda. Project ID tercantum di bagian Project info di Dasbor konsol Google Cloud.
- INSTANCE_ID, DATABASE_ID, dan TABLE_NAME : Lihat
Menyiapkan instance Spanner dengan tabel database
Singers
.
- Simpan file
singers.scala
.
- Tempel kode berikut ke dalam file
- Luncurkan REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Ganti kode berikut:
CONNECTOR_VERSION: Versi konektor Spanner. Pilih versi konektor Spanner dari daftar versi di repositori GitHub
GoogleCloudDataproc/spark-spanner-connector
. - Jalankan
singers.scala
dengan perintah:load singers.scala
untuk membuat tabelSingers
Spanner. Listingan output menampilkan contoh dari output Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Pembersihan
Agar tidak menimbulkan biaya berkelanjutan pada akun Google Cloud , Anda dapat menghentikan atau menghapus cluster Dataproc dan menghapus instance Spanner.