Menulis pesan Pub/Sub Lite menggunakan Apache Spark

Konektor Pub/Sub Lite Spark adalah library klien Java open source yang mendukung penggunaan Pub/Sub Lite sebagai sumber input dan output untuk Streaming Terstruktur Apache Spark. Konektor berfungsi di semua distribusi Apache Spark, termasuk Dataproc.

Panduan memulai ini menunjukkan cara:

  • membaca pesan dari Pub/Sub Lite
  • menulis pesan ke Pub/Sub Lite

menggunakan PySpark dari cluster Dataproc Spark.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Aktifkan API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Mengaktifkan API

  5. Menginstal Google Cloud CLI.
  6. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  7. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  8. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  9. Aktifkan API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Mengaktifkan API

  10. Menginstal Google Cloud CLI.
  11. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init

Penyiapan

  1. Buat variabel untuk project Anda.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Membuat bucket Cloud Storage. Nama bucket Cloud Storage harus unik secara global.

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. Buat topik dan langganan Pub/Sub Lite di lokasi yang didukung. Lihat Membuat topik jika Anda menggunakan reservasi Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Buat cluster Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: region Dataproc yang didukung tempat topik dan langganan Pub/Sub Lite Anda berada.
    • --image-version: versi image cluster, yang menentukan versi Apache Spark yang diinstal di cluster. Pilih versi rilis gambar 2.x.x karena Pub/Sub Lite Spark Connector saat ini mendukung Apache Spark 3.x.x.
    • --scopes: mengaktifkan akses API ke layanan Google Cloud di project yang sama.
    • --enable-component-gateway: mengaktifkan akses ke UI web Apache Spark.
    • --bucket: bucket Cloud Storage staging yang digunakan untuk menyimpan dependensi tugas cluster, output driver, dan file konfigurasi cluster.
  5. Clone repositori panduan memulai dan buka direktori kode contoh:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Menulis ke Pub/Sub Lite

Contoh berikut akan:

  • buat sumber tarif yang menghasilkan angka berurutan dan stempel waktu dengan format spark.sql.Row
  • ubah data agar sesuai dengan skema tabel yang diperlukan oleh writeStream API Pub/Sub Lite Spark Connector.
  • menulis data ke topik Pub/Sub Lite yang ada
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Untuk mengirimkan tugas tulis ke Dataproc:

Konsol

  1. Upload skrip PySpark ke bucket Cloud Storage Anda.
    1. Buka konsol Cloud Storage.
    2. Pilih bucket Anda.
    3. Gunakan Upload files untuk mengupload skrip PySpark yang ingin Anda gunakan.
  2. Kirim tugas ke cluster Dataproc Anda:
    1. Buka konsol Dataproc.
    2. Buka tugas.
    3. Klik Kirim tugas.
    4. Isi detail pekerjaan.
    5. Di bagian Cluster, pilih cluster Anda.
    6. Di bagian Job, beri nama untuk ID tugas.
    7. Untuk Job type, pilih PySpark.
    8. Untuk File python utama, berikan URI gsutil dari skrip PySpark yang diupload yang dimulai dengan gs://.
    9. Untuk file Jar, pilih versi konektor Spark terbaru dari Maven, cari jar dengan dependensi dalam opsi download, lalu salin link-nya.
    10. Untuk Arguments, jika Anda menggunakan skrip PySpark lengkap dari GitHub, masukkan --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID ; jika Anda menyalin skrip PySpark di atas dengan daftar tugas yang sudah selesai, biarkan kosong.
    11. Di bagian Properties, masukkan kunci spark.master dan nilai yarn.
    12. Klik Submit.

gcloud

Gunakan perintah gcloud dataproc jobs submit pyspark untuk mengirim tugas ke Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: region Dataproc yang telah dipilih sebelumnya.
  • --cluster: nama cluster Dataproc.
  • --jars: jar uber Pub/Sub Lite Spark Connector dengan dependensi dalam bucket Cloud Storage publik. Anda juga dapat membuka link ini untuk mendownload uber jar dengan dependensi dari Maven.
  • --driver-log-levels: menyetel level logging ke INFO di tingkat root.
  • --properties: menggunakan pengelola resource YARN untuk master Spark.
  • --: memberikan argumen yang diperlukan oleh skrip.

Jika operasi writeStream berhasil, Anda akan melihat pesan log seperti berikut secara lokal serta di halaman detail tugas di Konsol Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Membaca dari Pub/Sub Lite

Contoh berikut akan membaca pesan dari langganan Pub/Sub Lite yang ada menggunakan readStream API. Konektor akan menghasilkan pesan yang sesuai dengan skema tabel tetap yang diformat sebagai spark.sql.Row.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Untuk mengirimkan tugas baca ke Dataproc:

Konsol

  1. Upload skrip PySpark ke bucket Cloud Storage Anda.
    1. Buka konsol Cloud Storage.
    2. Pilih bucket Anda.
    3. Gunakan Upload files untuk mengupload skrip PySpark yang ingin Anda gunakan.
  2. Kirim tugas ke cluster Dataproc Anda:
    1. Buka konsol Dataproc.
    2. Buka tugas.
    3. Klik Kirim tugas.
    4. Isi detail pekerjaan.
    5. Di bagian Cluster, pilih cluster Anda.
    6. Di bagian Job, beri nama untuk ID tugas.
    7. Untuk Job type, pilih PySpark.
    8. Untuk File python utama, berikan URI gsutil dari skrip PySpark yang diupload yang dimulai dengan gs://.
    9. Untuk file Jar, pilih versi konektor Spark terbaru dari Maven, cari jar dengan dependensi dalam opsi download, lalu salin link-nya.
    10. Untuk Arguments, jika Anda menggunakan skrip PySpark lengkap dari GitHub, masukkan --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID ; jika Anda menyalin skrip PySpark di atas dengan daftar tugas yang sudah selesai, biarkan kosong.
    11. Di bagian Properties, masukkan kunci spark.master dan nilai yarn.
    12. Klik Submit.

gcloud

Gunakan perintah gcloud dataproc jobs submit pyspark lagi untuk mengirimkan tugas ke Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: region Dataproc yang telah dipilih sebelumnya.
  • --cluster: nama cluster Dataproc.
  • --jars: jar uber Pub/Sub Lite Spark Connector dengan dependensi dalam bucket Cloud Storage publik. Anda juga dapat membuka link ini untuk mendownload uber jar dengan dependensi dari Maven.
  • --driver-log-levels: menyetel level logging ke INFO di tingkat root.
  • --properties: menggunakan pengelola resource YARN untuk master Spark.
  • --: memberikan argumen yang diperlukan untuk skrip.

Jika operasi readStream berhasil, Anda akan melihat pesan log seperti berikut secara lokal serta di halaman detail tugas di Konsol Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Memutar ulang dan menghapus permanen pesan dari Pub/Sub Lite

Operasi pencari tidak berfungsi saat membaca dari Pub/Sub Lite menggunakan Pub/Sub Lite Spark Connector karena sistem Apache Spark melakukan pelacakan offsetnya sendiri dalam partisi. Solusinya adalah menghabiskan, mencari, dan memulai ulang alur kerja.

Pembersihan

Agar akun Google Cloud Anda tidak dikenakan biaya untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.

  1. Hapus topik dan langganan.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Hapus cluster Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Hapus bucket Cloud Storage.

    gsutil rb gs://$BUCKET
    

Langkah selanjutnya