使用 Apache Spark 写入 Pub/Sub Lite 消息

Pub/Sub Lite Spark 连接器是一个开源 Java 客户端库,支持使用 Pub/Sub Lite 作为 Apache Spark Structured Streaming 的输入和输出源。该连接器适用于所有 Apache Spark 发行版,包括 Dataproc。

本快速入门向您展示了如何执行以下操作:

  • 从 Pub/Sub 精简版读取消息
  • 向 Pub/Sub 精简版写入消息

使用 Dataproc Spark 集群中的 PySpark

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

设置

  1. 为您的项目创建变量。

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
  2. 创建 Cloud Storage 存储分区。 Cloud Storage 存储分区名称必须是全局唯一的。

    export BUCKET=your-bucket-name
    gcloud storage buckets create gs://$BUCKET
    
  3. 在受支持的位置创建 Pub/Sub 精简版主题和订阅。如果您使用的是 Pub/Sub 精简版预留,请参阅创建主题

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
  4. 创建 Dataproc 集群。

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    • --region:受支持的 Dataproc 区域,您的 Pub/Sub 精简版主题和订阅位于该区域。
    • --image-version集群的映像版本,用于确定集群上安装的 Apache Spark 版本。选择 2.x.x 映像发布版本,因为 Pub/Sub 精简版 Spark 连接器目前支持 Apache Spark 3.x.x。
    • --scopes:启用对同一项目中的 Google Cloud 服务的 API 访问权限。
    • --enable-component-gateway:启用对 Apache Spark 网页界面的访问权限。
    • --bucket:用于存储集群作业依赖项、驱动程序输出和集群配置文件的暂存 Cloud Storage 存储桶。
  5. 克隆快速入门代码库并导航到示例代码目录:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

写入 Pub/Sub 精简版

以下示例将执行以下操作:

  • 创建一个速率来源,用于生成格式为 spark.sql.Row 的连续数字和时间戳。
  • 使用 Pub/Sub 精简版 Spark 连接器的 writeStream API 转换数据以匹配所需的表架构
  • 将数据写入现有 Pub/Sub 精简版主题
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

要将写入作业提交到 Dataproc,请执行以下操作:

控制台

  1. 将 PySpark 脚本上传到您的 Cloud Storage 存储桶。
    1. 转到 Cloud Storage 控制台
    2. 选择您的存储桶。
    3. 使用上传文件来上传您要使用的 PySpark 脚本。
  2. 将作业提交到 Dataproc 集群:
    1. 转到 Dataproc 控制台
    2. 导航到作业。
    3. 点击提交作业
    4. 填写作业详细信息。
    5. 集群下,选择您的集群。
    6. 作业下,为作业 ID 指定名称。
    7. 对于作业类型,选择 PySpark。
    8. 对于主 Python 文件,请提供已上传 PySpark 脚本的 gcloud Storage URI,以 gs:// 开头。
    9. 对于 Jar 文件,请从 Maven 中选择最新的 Spark 连接器版本,在下载选项中查找包含依赖项的 jar,然后复制其链接。
    10. 对于参数,如果您使用来自 GitHub 的完整 PySpark 脚本,请输入 --project_number=PROJECT_NUMBER--location=PUBSUBLITE_LOCATION--topic_id=TOPIC_ID;如果您复制上面的 PySpark 脚本并完成待办事项,请将其留空。
    11. 属性下,输入键 spark.master 和值 yarn
    12. 点击提交

gcloud

使用 gcloud dataproc jobs submit pyspark 命令将作业提交到 Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region:预先选择的 Dataproc 区域
  • --cluster:Dataproc 集群名称。
  • --jars:位于公共 Cloud Storage 存储桶中,包含依赖项的 Pub/Sub 精简版 Spark 连接器的 uber jar。您还可以访问此链接,从 Maven 下载包含依赖项的 uber jar。
  • --driver-log-levels:在根级别将日志记录级别设置为“信息”。
  • --properties:为 Spark 主实例使用 YARN 资源管理器。
  • --:提供脚本所需的参数。

如果 writeStream 操作成功,您应该会在本地以及 Google Cloud 控制台中的作业详情页面中看到如下日志消息:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

从 Pub/Sub 精简版读取

以下示例将使用 readStream API 从现有 Pub/Sub 精简版订阅中读取消息。连接器将输出符合格式为 spark.sql.Row 的固定表架构的消息。

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

要将读取作业提交到 Dataproc,请执行以下操作:

控制台

  1. 将 PySpark 脚本上传到您的 Cloud Storage 存储桶。
    1. 转到 Cloud Storage 控制台
    2. 选择您的存储桶。
    3. 使用上传文件来上传您要使用的 PySpark 脚本。
  2. 将作业提交到 Dataproc 集群:
    1. 转到 Dataproc 控制台
    2. 导航到作业。
    3. 点击提交作业
    4. 填写作业详细信息。
    5. 集群下,选择您的集群。
    6. 作业下,为作业 ID 指定名称。
    7. 对于作业类型,选择 PySpark。
    8. 对于主 Python 文件,请提供已上传 PySpark 脚本的 gcloud Storage URI,以 gs:// 开头。
    9. 对于 Jar 文件,请从 Maven 中选择最新的 Spark 连接器版本,在下载选项中查找包含依赖项的 jar,然后复制其链接。
    10. 对于参数,如果您使用来自 GitHub 的完整 PySpark 脚本,请输入 --project_number=PROJECT_NUMBER--location=PUBSUBLITE_LOCATION--subscription_id=SUBSCRIPTION_ID;如果您复制上面的 PySpark 脚本并完成待办事项,请将其留空。
    11. 属性下,输入键 spark.master 和值 yarn
    12. 点击提交

gcloud

再次使用 gcloud dataproc jobs submit pyspark 命令将作业提交到 Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region:预先选择的 Dataproc 区域
  • --cluster:Dataproc 集群名称。
  • --jars:位于公共 Cloud Storage 存储桶中,包含依赖项的 Pub/Sub 精简版 Spark 连接器的 uber jar。您还可以访问此链接,从 Maven 下载包含依赖项的 uber jar。
  • --driver-log-levels:在根级别将日志记录级别设置为“信息”。
  • --properties:为 Spark 主实例使用 YARN 资源管理器。
  • --:为脚本提供所需的参数。

如果 readStream 操作成功,您应该会在本地以及 Google Cloud 控制台中的作业详情页面中看到如下日志消息:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

重放和清除 Pub/Sub 精简版消息

使用 Pub/Sub Lite Spark 连接器从 Pub/Sub Lite 读取时,跳转操作不起作用,因为 Apache Spark 系统会自行跟踪分区内偏移。解决方法是排空、还原和重启工作流。

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

  1. 删除主题和订阅。

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. 删除 Dataproc 集群。

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. 移除 Cloud Storage 存储分区。

    gcloud storage rm gs://$BUCKET
    

后续步骤