Menulis pesan Pub/Sub Lite menggunakan Apache Spark

Konektor Spark Pub/Sub Lite adalah library klien Java open source yang mendukung penggunaan Pub/Sub Lite sebagai sumber input dan output untuk Streaming Terstruktur Apache Spark. Konektor berfungsi di semua distribusi Apache Spark, termasuk Dataproc.

Panduan memulai ini menunjukkan cara:

  • membaca pesan dari Pub/Sub Lite
  • menulis pesan ke Pub/Sub Lite

menggunakan PySpark dari cluster Spark Dataproc.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Siapkan

  1. Buat variabel untuk project Anda.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
  2. Membuat bucket Cloud Storage. Nama bucket Cloud Storage harus unik secara global.

    export BUCKET=your-bucket-name
    gcloud storage buckets create gs://$BUCKET
    
  3. Buat topik dan langganan Pub/Sub Lite di lokasi yang didukung. Lihat Membuat topik jika Anda menggunakan reservasi Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
  4. Buat cluster Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    • --region: Region Dataproc yang didukung tempat topik dan langganan Pub/Sub Lite Anda berada.
    • --image-version: versi image cluster, yang menentukan versi Apache Spark yang diinstal di cluster. Pilih versi rilis image 2.x.x karena Konektor Spark Pub/Sub Lite saat ini mendukung Apache Spark 3.x.x.
    • --scopes: mengaktifkan akses API ke layanan Google Cloud dalam project yang sama.
    • --enable-component-gateway: mengaktifkan akses ke UI web Apache Spark.
    • --bucket: bucket Cloud Storage staging yang digunakan untuk menyimpan dependensi tugas cluster, output driver, dan file konfigurasi cluster.
  5. Clone repositori memulai cepat dan buka direktori kode sampel:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Menulis ke Pub/Sub Lite

Contoh berikut akan:

  • membuat sumber kapasitas yang menghasilkan angka dan stempel waktu berurutan yang diformat sebagai spark.sql.Row
  • mengubah data agar cocok dengan skema tabel yang diperlukan oleh API writeStream Konektor Spark Pub/Sub Lite
  • menulis data ke topik Pub/Sub Lite yang ada
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Untuk mengirimkan tugas tulis ke Dataproc:

Konsol

  1. Upload skrip PySpark ke bucket Cloud Storage Anda.
    1. Buka konsol Cloud Storage.
    2. Pilih bucket Anda.
    3. Gunakan Upload file untuk mengupload skrip PySpark yang ingin Anda gunakan.
  2. Kirim tugas ke cluster Dataproc Anda:
    1. Buka konsol Dataproc.
    2. Buka lowongan.
    3. Klik Kirim tugas.
    4. Isi detail pekerjaan.
    5. Di bagian Cluster, pilih cluster Anda.
    6. Di bagian Tugas, beri nama pada ID tugas.
    7. Untuk Jenis tugas, pilih PySpark.
    8. Untuk File python utama, berikan URI penyimpanan gcloud dari skrip PySpark yang diupload yang dimulai dengan gs://.
    9. Untuk file Jar, pilih versi konektor Spark terbaru dari Maven , cari jar dengan dependensi di opsi download, dan salin link-nya.
    10. Untuk Argumen, jika Anda menggunakan skrip PySpark lengkap dari GitHub, masukkan --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID; jika Anda menyalin skrip PySpark di atas dengan daftar tugas yang telah selesai, biarkan kosong.
    11. Di bagian Properties, masukkan kunci spark.master dan nilai yarn.
    12. Klik Kirim.

gcloud

Gunakan perintah gcloud dataproc jobs submit pyspark untuk mengirimkan tugas ke Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: region Dataproc yang telah dipilih sebelumnya.
  • --cluster: nama cluster Dataproc.
  • --jars: jar uber Pub/Sub Lite Spark Connector dengan dependensi di bucket Cloud Storage publik. Anda juga dapat membuka link ini untuk mendownload uber jar dengan dependensi dari Maven.
  • --driver-log-levels: menetapkan level logging ke INFO di tingkat root.
  • --properties: menggunakan pengelola resource YARN untuk master Spark.
  • --: memberikan argumen yang diperlukan oleh skrip.

Jika operasi writeStream berhasil, Anda akan melihat pesan log seperti berikut secara lokal serta di halaman detail tugas di konsol Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Membaca dari Pub/Sub Lite

Contoh berikut akan membaca pesan dari langganan Pub/Sub Lite yang ada menggunakan API readStream. Konektor akan menghasilkan pesan yang sesuai dengan skema tabel tetap yang diformat sebagai spark.sql.Row .

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Untuk mengirimkan tugas baca ke Dataproc:

Konsol

  1. Upload skrip PySpark ke bucket Cloud Storage Anda.
    1. Buka konsol Cloud Storage.
    2. Pilih bucket Anda.
    3. Gunakan Upload file untuk mengupload skrip PySpark yang ingin Anda gunakan.
  2. Kirim tugas ke cluster Dataproc Anda:
    1. Buka konsol Dataproc.
    2. Buka lowongan.
    3. Klik Kirim tugas.
    4. Isi detail pekerjaan.
    5. Di bagian Cluster, pilih cluster Anda.
    6. Di bagian Tugas, beri nama pada ID tugas.
    7. Untuk Jenis tugas, pilih PySpark.
    8. Untuk File python utama, berikan URI penyimpanan gcloud dari skrip PySpark yang diupload yang dimulai dengan gs://.
    9. Untuk file Jar, pilih versi konektor Spark terbaru dari Maven , cari jar dengan dependensi di opsi download, dan salin link-nya.
    10. Untuk Argumen, jika Anda menggunakan skrip PySpark lengkap dari GitHub, masukkan --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID; jika Anda menyalin skrip PySpark di atas dengan daftar tugas yang telah selesai, biarkan kosong.
    11. Di bagian Properties, masukkan kunci spark.master dan nilai yarn.
    12. Klik Kirim.

gcloud

Gunakan perintah gcloud dataproc jobs submit pyspark lagi untuk mengirimkan tugas ke Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: region Dataproc yang telah dipilih sebelumnya.
  • --cluster: nama cluster Dataproc.
  • --jars: jar uber Pub/Sub Lite Spark Connector dengan dependensi di bucket Cloud Storage publik. Anda juga dapat membuka link ini untuk mendownload uber jar dengan dependensi dari Maven.
  • --driver-log-levels: menetapkan level logging ke INFO di tingkat root.
  • --properties: menggunakan pengelola resource YARN untuk master Spark.
  • --: memberikan argumen yang diperlukan untuk skrip.

Jika operasi readStream berhasil, Anda akan melihat pesan log seperti berikut secara lokal serta di halaman detail tugas di konsol Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Memutar ulang dan menghapus permanen pesan dari Pub/Sub Lite

Operasi pencarian tidak berfungsi saat membaca dari Pub/Sub Lite menggunakan Konektor Spark Pub/Sub Lite karena sistem Apache Spark melakukan pelacakan offsetnya sendiri dalam partisi. Solusinya adalah menghabiskan, mencari, dan memulai ulang alur kerja.

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, ikuti langkah-langkah berikut.

  1. Hapus topik dan langganan.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Hapus cluster Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Hapus bucket Cloud Storage.

    gcloud storage rm gs://$BUCKET
    

Langkah selanjutnya