使用 Apache Spark 编写 Pub/Sub Lite 消息

Pub/Sub 精简版 Spark 连接器是一个开源 Java 客户端库,支持使用 Pub/Sub 精简版作为 Apache Spark Structured Streaming 的输入和输出源。该连接器适用于所有 Apache Spark 发行版,包括 Dataproc。

本快速入门向您展示了如何执行以下操作:

  • 从 Pub/Sub 精简版读取消息
  • 向 Pub/Sub 精简版写入消息

使用 Dataproc Spark 集群中的 PySpark

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Pub/Sub Lite、Dataproc、Cloud Storage、Logging API。

    启用 API

  5. 安装 Google Cloud CLI。
  6. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  7. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  8. 确保您的 Google Cloud 项目已启用结算功能

  9. 启用 Pub/Sub Lite、Dataproc、Cloud Storage、Logging API。

    启用 API

  10. 安装 Google Cloud CLI。
  11. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init

设置

  1. 为您的项目创建变量。

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. 创建 Cloud Storage 存储桶。 Cloud Storage 存储桶名称必须是全局唯一的。

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. 在受支持的位置创建 Pub/Sub Lite 主题和订阅。如果您使用 Pub/Sub 精简版预留,请参阅创建主题

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. 创建 Dataproc 集群。

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region:受支持的 Dataproc 区域,您的 Pub/Sub 精简版主题和订阅位于该区域。
    • --image-version集群的映像版本,用于确定集群上安装的 Apache Spark 版本。选择 2.x.x 映像发布版本,因为 Pub/Sub Lite Spark 连接器目前支持 Apache Spark 3.x.x。
    • --scopes:启用对同一项目中的 Google Cloud 服务的 API 访问权限。
    • --enable-component-gateway:启用对 Apache Spark 网页界面的访问权限。
    • --bucket:用于存储集群作业依赖项、驱动程序输出和集群配置文件的暂存 Cloud Storage 存储桶。
  5. 克隆快速入门代码库并导航到示例代码目录:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

写入 Pub/Sub 精简版

以下示例将执行以下操作:

  • 创建一个速率来源,用于生成格式为 spark.sql.Row 的连续数字和时间戳。
  • 转换数据以匹配 Pub/Sub 精简版 Spark 连接器的 writeStream API 所需的表架构
  • 将数据写入现有 Pub/Sub 精简版主题
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

要将写入作业提交到 Dataproc,请执行以下操作:

控制台

  1. 将 PySpark 脚本上传到您的 Cloud Storage 存储桶。
    1. 转到 Cloud Storage 控制台
    2. 选择您的存储桶。
    3. 使用上传文件来上传您要使用的 PySpark 脚本。
  2. 将作业提交到 Dataproc 集群:
    1. 转到 Dataproc 控制台
    2. 导航到作业。
    3. 点击提交作业
    4. 填写作业详细信息。
    5. 集群下,选择您的集群。
    6. 作业下,为作业 ID 指定名称。
    7. 对于作业类型,选择 PySpark。
    8. 对于主 Python 文件,请提供已上传 PySpark 脚本的 gsutil URI,以 gs:// 开头。
    9. 对于 Jar 文件,从 Maven 中选择最新的 Spark 连接器版本,在下载选项中查找包含依赖项的 jar 文件,并复制其链接。
    10. 对于参数,如果您使用来自 GitHub 的完整 PySpark 脚本,请输入 --project_number=PROJECT_NUMBER--location=PUBSUBLITE_LOCATION--topic_id=TOPIC_ID;如果您复制上面的 PySpark 脚本并完成待办事项,请将其留空。
    11. 属性下,输入键 spark.master 和值 yarn
    12. 点击提交

gcloud

使用 gcloud dataproc jobs submit pyspark 命令将作业提交到 Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region:预先选择的 Dataproc 区域
  • --cluster:Dataproc 集群名称。
  • --jars:位于公共 Cloud Storage 存储桶中,包含依赖项的 Pub/Sub 精简版 Spark 连接器的 uber jar。您还可以访问此链接,从 Maven 下载包含依赖项的 uber jar。
  • --driver-log-levels:在根级别将日志记录级别设置为“信息”。
  • --properties:为 Spark 主实例使用 YARN 资源管理器。
  • --:提供脚本所需的参数。

如果 writeStream 操作成功,您应该会在本地以及 Google Cloud 控制台的作业详情页面中看到如下日志消息:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

从 Pub/Sub 精简版读取

以下示例将使用 readStream API 从现有 Pub/Sub 精简版订阅中读取消息。连接器将输出符合格式为 spark.sql.Row 的固定表架构的消息。

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

要将读取作业提交到 Dataproc,请执行以下操作:

控制台

  1. 将 PySpark 脚本上传到您的 Cloud Storage 存储桶。
    1. 转到 Cloud Storage 控制台
    2. 选择您的存储桶。
    3. 使用上传文件来上传您要使用的 PySpark 脚本。
  2. 将作业提交到 Dataproc 集群:
    1. 转到 Dataproc 控制台
    2. 导航到作业。
    3. 点击提交作业
    4. 填写作业详细信息。
    5. 集群下,选择您的集群。
    6. 作业下,为作业 ID 指定名称。
    7. 对于作业类型,选择 PySpark。
    8. 对于主 Python 文件,请提供已上传 PySpark 脚本的 gsutil URI,以 gs:// 开头。
    9. 对于 Jar 文件,从 Maven 中选择最新的 Spark 连接器版本,在下载选项中查找包含依赖项的 jar 文件,并复制其链接。
    10. 对于参数,如果您使用来自 GitHub 的完整 PySpark 脚本,请输入 --project_number=PROJECT_NUMBER--location=PUBSUBLITE_LOCATION--subscription_id=SUBSCRIPTION_ID;如果您复制上面的 PySpark 脚本并完成待办事项,请将其留空。
    11. 属性下,输入键 spark.master 和值 yarn
    12. 点击提交

gcloud

再次使用 gcloud dataproc jobs submit pyspark 命令将作业提交到 Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region:预先选择的 Dataproc 区域
  • --cluster:Dataproc 集群名称。
  • --jars:位于公共 Cloud Storage 存储桶中,包含依赖项的 Pub/Sub 精简版 Spark 连接器的 uber jar。您还可以访问此链接,从 Maven 下载包含依赖项的 uber jar。
  • --driver-log-levels:在根级别将日志记录级别设置为“信息”。
  • --properties:为 Spark 主实例使用 YARN 资源管理器。
  • --:为脚本提供所需的参数。

如果 readStream 操作成功,您应该会在本地以及 Google Cloud 控制台的作业详情页面中看到如下日志消息:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

重放和完全清除 Pub/Sub Lite 中的消息

使用 Pub/Sub Lite Spark 连接器从 Pub/Sub Lite 读取数据时,还原操作不起作用,因为 Apache Spark 系统会自行跟踪分区内的偏移量。解决方法是排空、搜寻和重启工作流。

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

  1. 删除主题和订阅。

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. 删除 Dataproc 集群。

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. 移除 Cloud Storage 存储桶。

    gsutil rb gs://$BUCKET
    

后续步骤