Menulis pesan Pub/Sub Lite menggunakan Apache Spark
Konektor Spark Pub/Sub Lite adalah library klien Java open source yang mendukung penggunaan Pub/Sub Lite sebagai sumber input dan output untuk Streaming Terstruktur Apache Spark. Konektor berfungsi di semua distribusi Apache Spark, termasuk Dataproc.
Panduan memulai ini menunjukkan cara:
- membaca pesan dari Pub/Sub Lite
- menulis pesan ke Pub/Sub Lite
menggunakan PySpark dari cluster Spark Dataproc.
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Siapkan
Buat variabel untuk project Anda.
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Membuat bucket Cloud Storage. Nama bucket Cloud Storage harus unik secara global.
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
Buat topik dan langganan Pub/Sub Lite di lokasi yang didukung. Lihat Membuat topik jika Anda menggunakan reservasi Pub/Sub Lite.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Buat cluster Dataproc.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: Region Dataproc yang didukung tempat topik dan langganan Pub/Sub Lite Anda berada.--image-version
: versi image cluster, yang menentukan versi Apache Spark yang diinstal di cluster. Pilih versi rilis image 2.x.x karena Konektor Spark Pub/Sub Lite saat ini mendukung Apache Spark 3.x.x.--scopes
: mengaktifkan akses API ke layanan Google Cloud dalam project yang sama.--enable-component-gateway
: mengaktifkan akses ke UI web Apache Spark.--bucket
: bucket Cloud Storage staging yang digunakan untuk menyimpan dependensi tugas cluster, output driver, dan file konfigurasi cluster.
Clone repositori memulai cepat dan buka direktori kode sampel:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Menulis ke Pub/Sub Lite
Contoh berikut akan:
- membuat
sumber kapasitas
yang menghasilkan angka dan stempel waktu berurutan yang diformat sebagai
spark.sql.Row
- mengubah data agar cocok dengan skema tabel yang diperlukan oleh API
writeStream
Konektor Spark Pub/Sub Lite - menulis data ke topik Pub/Sub Lite yang ada
Untuk mengirimkan tugas tulis ke Dataproc:
Konsol
- Upload skrip PySpark ke bucket Cloud Storage Anda.
- Buka konsol Cloud Storage.
- Pilih bucket Anda.
- Gunakan Upload file untuk mengupload skrip PySpark yang ingin Anda gunakan.
- Kirim tugas ke cluster Dataproc Anda:
- Buka konsol Dataproc.
- Buka lowongan.
- Klik Kirim tugas.
- Isi detail lowongan.
- Di bagian Cluster, pilih cluster Anda.
- Di bagian Tugas, beri nama pada ID tugas.
- Untuk Jenis tugas, pilih PySpark.
- Untuk File python utama, berikan URI penyimpanan gcloud dari skrip PySpark yang diupload yang dimulai dengan
gs://
. - Untuk file Jar, pilih versi konektor Spark terbaru dari Maven , cari jar dengan dependensi di opsi download, dan salin link-nya.
- Untuk Argumen, jika Anda menggunakan skrip PySpark lengkap dari GitHub, masukkan
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID; jika Anda menyalin skrip PySpark di atas dengan daftar tugas yang telah selesai, biarkan kosong. - Di bagian Properties, masukkan kunci
spark.master
dan nilaiyarn
. - Klik Kirim.
gcloud
Gunakan perintah gcloud dataproc jobs submit pyspark untuk mengirimkan tugas ke Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: region Dataproc yang telah dipilih sebelumnya.--cluster
: nama cluster Dataproc.--jars
: jar uber Pub/Sub Lite Spark Connector dengan dependensi di bucket Cloud Storage publik. Anda juga dapat membuka link ini untuk mendownload uber jar dengan dependensi dari Maven.--driver-log-levels
: menetapkan level logging ke INFO di tingkat root.--properties
: menggunakan pengelola resource YARN untuk master Spark.--
: memberikan argumen yang diperlukan oleh skrip.
Jika operasi writeStream
berhasil, Anda akan melihat pesan log seperti berikut secara lokal serta di halaman detail tugas di konsol Google Cloud:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Membaca dari Pub/Sub Lite
Contoh berikut akan membaca pesan dari langganan Pub/Sub Lite yang ada menggunakan API readStream
. Konektor akan menghasilkan pesan yang sesuai dengan
skema tabel tetap
yang diformat sebagai
spark.sql.Row
.
Untuk mengirimkan tugas baca ke Dataproc:
Konsol
- Upload skrip PySpark ke bucket Cloud Storage Anda.
- Buka konsol Cloud Storage.
- Pilih bucket Anda.
- Gunakan Upload file untuk mengupload skrip PySpark yang ingin Anda gunakan.
- Kirim tugas ke cluster Dataproc Anda:
- Buka konsol Dataproc.
- Buka lowongan.
- Klik Kirim tugas.
- Isi detail lowongan.
- Di bagian Cluster, pilih cluster Anda.
- Di bagian Tugas, beri nama pada ID tugas.
- Untuk Jenis tugas, pilih PySpark.
- Untuk File python utama, berikan URI penyimpanan gcloud dari skrip PySpark yang diupload yang dimulai dengan
gs://
. - Untuk file Jar, pilih versi konektor Spark terbaru dari Maven , cari jar dengan dependensi di opsi download, dan salin link-nya.
- Untuk Argumen, jika Anda menggunakan skrip PySpark lengkap dari GitHub, masukkan
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID; jika Anda menyalin skrip PySpark di atas dengan daftar tugas yang telah selesai, biarkan kosong. - Di bagian Properties, masukkan kunci
spark.master
dan nilaiyarn
. - Klik Kirim.
gcloud
Gunakan perintah gcloud dataproc jobs submit pyspark lagi untuk mengirimkan tugas ke Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: region Dataproc yang telah dipilih sebelumnya.--cluster
: nama cluster Dataproc.--jars
: jar uber Pub/Sub Lite Spark Connector dengan dependensi di bucket Cloud Storage publik. Anda juga dapat membuka link ini untuk mendownload uber jar dengan dependensi dari Maven.--driver-log-levels
: menetapkan level logging ke INFO di tingkat root.--properties
: menggunakan pengelola resource YARN untuk master Spark.--
: memberikan argumen yang diperlukan untuk skrip.
Jika operasi readStream
berhasil, Anda akan melihat pesan log seperti berikut secara lokal serta di halaman detail tugas di konsol Google Cloud:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Memutar ulang dan menghapus permanen pesan dari Pub/Sub Lite
Operasi pencarian tidak berfungsi saat membaca dari Pub/Sub Lite menggunakan Konektor Spark Pub/Sub Lite karena sistem Apache Spark melakukan pelacakan offsetnya sendiri dalam partisi. Solusinya adalah menghabiskan, mencari, dan memulai ulang alur kerja.
Pembersihan
Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.
Hapus topik dan langganan.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Hapus cluster Dataproc.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Hapus bucket Cloud Storage.
gcloud storage rm gs://$BUCKET
Langkah selanjutnya
Lihat contoh penghitungan kata dalam Java untuk Konektor Spark Pub/Sub Lite.
Pelajari cara mengakses output driver tugas Dataproc.
Konektor Spark lainnya dari produk Google Cloud: konektor BigQuery, konektor Bigtable, konektor Cloud Storage.