Menulis pesan Pub/Sub Lite menggunakan Apache Spark
Konektor Pub/Sub Lite Spark adalah library klien Java open source yang mendukung penggunaan Pub/Sub Lite sebagai sumber input dan output untuk Streaming Terstruktur Apache Spark. Konektor berfungsi di semua distribusi Apache Spark, termasuk Dataproc.
Panduan memulai ini menunjukkan cara:
- membaca pesan dari Pub/Sub Lite
- menulis pesan ke Pub/Sub Lite
menggunakan PySpark dari cluster Dataproc Spark.
Sebelum memulai
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.
-
Make sure that billing is enabled for your Google Cloud project.
-
Aktifkan API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Penyiapan
Buat variabel untuk project Anda.
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Membuat bucket Cloud Storage. Nama bucket Cloud Storage harus unik secara global.
export BUCKET=your-bucket-name
gsutil mb gs://$BUCKET
Buat topik dan langganan Pub/Sub Lite di lokasi yang didukung. Lihat Membuat topik jika Anda menggunakan reservasi Pub/Sub Lite.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Buat cluster Dataproc.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: region Dataproc yang didukung tempat topik dan langganan Pub/Sub Lite Anda berada.--image-version
: versi image cluster, yang menentukan versi Apache Spark yang diinstal di cluster. Pilih versi rilis gambar 2.x.x karena Pub/Sub Lite Spark Connector saat ini mendukung Apache Spark 3.x.x.--scopes
: mengaktifkan akses API ke layanan Google Cloud di project yang sama.--enable-component-gateway
: mengaktifkan akses ke UI web Apache Spark.--bucket
: bucket Cloud Storage staging yang digunakan untuk menyimpan dependensi tugas cluster, output driver, dan file konfigurasi cluster.
Clone repositori panduan memulai dan buka direktori kode contoh:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Menulis ke Pub/Sub Lite
Contoh berikut akan:
- buat
sumber tarif
yang menghasilkan angka berurutan dan stempel waktu dengan format
spark.sql.Row
- ubah data agar sesuai dengan skema tabel yang diperlukan oleh
writeStream
API Pub/Sub Lite Spark Connector. - menulis data ke topik Pub/Sub Lite yang ada
Untuk mengirimkan tugas tulis ke Dataproc:
Konsol
- Upload skrip PySpark ke bucket Cloud Storage Anda.
- Buka konsol Cloud Storage.
- Pilih bucket Anda.
- Gunakan Upload files untuk mengupload skrip PySpark yang ingin Anda gunakan.
- Kirim tugas ke cluster Dataproc Anda:
- Buka konsol Dataproc.
- Buka tugas.
- Klik Kirim tugas.
- Isi detail pekerjaan.
- Di bagian Cluster, pilih cluster Anda.
- Di bagian Job, beri nama untuk ID tugas.
- Untuk Job type, pilih PySpark.
- Untuk File python utama, berikan URI gsutil dari skrip PySpark yang diupload
yang dimulai dengan
gs://
. - Untuk file Jar, pilih versi konektor Spark terbaru dari Maven, cari jar dengan dependensi dalam opsi download, lalu salin link-nya.
- Untuk Arguments, jika Anda menggunakan skrip PySpark lengkap dari
GitHub, masukkan
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID ; jika Anda menyalin skrip PySpark di atas dengan daftar tugas yang sudah selesai, biarkan kosong. - Di bagian Properties, masukkan kunci
spark.master
dan nilaiyarn
. - Klik Submit.
gcloud
Gunakan perintah gcloud dataproc jobs submit pyspark untuk mengirim tugas ke Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: region Dataproc yang telah dipilih sebelumnya.--cluster
: nama cluster Dataproc.--jars
: jar uber Pub/Sub Lite Spark Connector dengan dependensi dalam bucket Cloud Storage publik. Anda juga dapat membuka link ini untuk mendownload uber jar dengan dependensi dari Maven.--driver-log-levels
: menyetel level logging ke INFO di tingkat root.--properties
: menggunakan pengelola resource YARN untuk master Spark.--
: memberikan argumen yang diperlukan oleh skrip.
Jika operasi writeStream
berhasil, Anda akan melihat pesan log seperti berikut secara lokal serta di halaman detail tugas di Konsol Google Cloud:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Membaca dari Pub/Sub Lite
Contoh berikut akan membaca pesan dari langganan
Pub/Sub Lite yang ada menggunakan
readStream
API. Konektor akan menghasilkan pesan yang sesuai dengan skema tabel tetap yang diformat sebagai spark.sql.Row
.
Untuk mengirimkan tugas baca ke Dataproc:
Konsol
- Upload skrip PySpark ke bucket Cloud Storage Anda.
- Buka konsol Cloud Storage.
- Pilih bucket Anda.
- Gunakan Upload files untuk mengupload skrip PySpark yang ingin Anda gunakan.
- Kirim tugas ke cluster Dataproc Anda:
- Buka konsol Dataproc.
- Buka tugas.
- Klik Kirim tugas.
- Isi detail pekerjaan.
- Di bagian Cluster, pilih cluster Anda.
- Di bagian Job, beri nama untuk ID tugas.
- Untuk Job type, pilih PySpark.
- Untuk File python utama, berikan URI gsutil dari skrip PySpark yang diupload
yang dimulai dengan
gs://
. - Untuk file Jar, pilih versi konektor Spark terbaru dari Maven, cari jar dengan dependensi dalam opsi download, lalu salin link-nya.
- Untuk Arguments, jika Anda menggunakan skrip PySpark lengkap dari
GitHub, masukkan
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID ; jika Anda menyalin skrip PySpark di atas dengan daftar tugas yang sudah selesai, biarkan kosong. - Di bagian Properties, masukkan kunci
spark.master
dan nilaiyarn
. - Klik Submit.
gcloud
Gunakan perintah gcloud dataproc jobs submit pyspark lagi untuk mengirimkan tugas ke Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: region Dataproc yang telah dipilih sebelumnya.--cluster
: nama cluster Dataproc.--jars
: jar uber Pub/Sub Lite Spark Connector dengan dependensi dalam bucket Cloud Storage publik. Anda juga dapat membuka link ini untuk mendownload uber jar dengan dependensi dari Maven.--driver-log-levels
: menyetel level logging ke INFO di tingkat root.--properties
: menggunakan pengelola resource YARN untuk master Spark.--
: memberikan argumen yang diperlukan untuk skrip.
Jika operasi readStream
berhasil, Anda akan melihat pesan log seperti berikut secara lokal serta di halaman detail tugas di Konsol Google Cloud:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Memutar ulang dan menghapus permanen pesan dari Pub/Sub Lite
Operasi pencari tidak berfungsi saat membaca dari Pub/Sub Lite menggunakan Pub/Sub Lite Spark Connector karena sistem Apache Spark melakukan pelacakan offsetnya sendiri dalam partisi. Solusinya adalah menghabiskan, mencari, dan memulai ulang alur kerja.
Pembersihan
Agar akun Google Cloud Anda tidak dikenakan biaya untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.
Hapus topik dan langganan.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Hapus cluster Dataproc.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Hapus bucket Cloud Storage.
gsutil rb gs://$BUCKET
Langkah selanjutnya
Lihat contoh jumlah kata di Java untuk Pub/Sub Lite Spark Connector.
Pelajari cara mengakses output driver tugas Dataproc.
Konektor Spark lainnya oleh produk Google Cloud: konektor BigQuery, konektor Bigtable, konektor Cloud Storage.