使用 Apache Spark 写入 Pub/Sub 精简版消息
Pub/Sub 精简版 Spark 连接器是一个开源 Java 客户端库,支持使用 Pub/Sub 精简版作为 Apache Spark Structured Streaming 的输入和输出源。该连接器适用于所有 Apache Spark 发行版,包括 Dataproc。
本快速入门向您展示了如何执行以下操作:
- 从 Pub/Sub 精简版读取消息
- 向 Pub/Sub 精简版写入消息
使用 Dataproc Spark 集群中的 PySpark。
准备工作
- 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
-
启用 Pub/Sub Lite、Dataproc、Cloud Storage、Logging API。
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
-
启用 Pub/Sub Lite、Dataproc、Cloud Storage、Logging API。
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
设置
为您的项目创建变量。
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
创建 Cloud Storage 存储桶。 Cloud Storage 存储桶名称必须是全局唯一的。
export BUCKET=your-bucket-name
gsutil mb gs://$BUCKET
在受支持的位置创建 Pub/Sub 精简版主题和订阅。如果您使用 Pub/Sub Lite 预留,请参阅创建主题。
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
创建 Dataproc 集群。
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
:受支持的 Dataproc 区域,您的 Pub/Sub 精简版主题和订阅位于该区域。--image-version
:集群的映像版本,用于确定集群上安装的 Apache Spark 版本。选择 2.x.x 映像发布版本,因为 Pub/Sub Lite Spark 连接器目前支持 Apache Spark 3.x.x。--scopes
:在同一项目中启用对 Google Cloud 服务的 API 访问权限。--enable-component-gateway
:启用对 Apache Spark 网页界面的访问权限。--bucket
:用于存储集群作业依赖项、驱动程序输出和集群配置文件的暂存 Cloud Storage 存储桶。
克隆快速入门代码库并导航到示例代码目录:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
写入 Pub/Sub 精简版
以下示例将执行以下操作:
- 创建一个速率来源,用于生成格式为
spark.sql.Row
的连续数字和时间戳。 - 转换数据以匹配 Pub/Sub 精简版 Spark 连接器的
writeStream
API 所需的表架构 - 将数据写入现有 Pub/Sub 精简版主题
要将写入作业提交到 Dataproc,请执行以下操作:
控制台
- 将 PySpark 脚本上传到您的 Cloud Storage 存储桶。
- 转到 Cloud Storage 控制台。
- 选择您的存储桶。
- 使用上传文件来上传您要使用的 PySpark 脚本。
- 将作业提交到 Dataproc 集群:
- 转到 Dataproc 控制台。
- 导航到作业。
- 点击提交作业。
- 填写作业详细信息。
- 在集群下,选择您的集群。
- 在作业下,为作业 ID 指定名称。
- 对于作业类型,选择 PySpark。
- 对于主 Python 文件,请提供已上传 PySpark 脚本的 gsutil URI,以
gs://
开头。 - 对于 Jar 文件,请从 Maven 中选择最新的 Spark 连接器版本,在下载选项中查找包含依赖项的 JAR,然后复制其链接。
- 对于参数,如果您使用来自 GitHub 的完整 PySpark 脚本,请输入
--project_number=
PROJECT_NUMBER、--location=
PUBSUBLITE_LOCATION、--topic_id=
TOPIC_ID;如果您复制上面的 PySpark 脚本并完成待办事项,请将其留空。 - 在属性下,输入键
spark.master
和值yarn
。 - 点击提交。
gcloud
使用 gcloud dataproc jobs submit pyspark 命令将作业提交到 Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
如果 writeStream
操作成功,您应该会在本地以及 Google Cloud 控制台的作业详细信息页面上看到如下所示的日志消息:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
从 Pub/Sub 精简版读取
以下示例将使用 readStream
API 从现有 Pub/Sub 精简版订阅中读取消息。连接器将输出符合格式为 spark.sql.Row
的固定表架构的消息。
要将读取作业提交到 Dataproc,请执行以下操作:
控制台
- 将 PySpark 脚本上传到您的 Cloud Storage 存储桶。
- 转到 Cloud Storage 控制台。
- 选择您的存储桶。
- 使用上传文件来上传您要使用的 PySpark 脚本。
- 将作业提交到 Dataproc 集群:
- 转到 Dataproc 控制台。
- 导航到作业。
- 点击提交作业。
- 填写作业详细信息。
- 在集群下,选择您的集群。
- 在作业下,为作业 ID 指定名称。
- 对于作业类型,选择 PySpark。
- 对于主 Python 文件,请提供已上传 PySpark 脚本的 gsutil URI,以
gs://
开头。 - 对于 Jar 文件,请从 Maven 中选择最新的 Spark 连接器版本,在下载选项中查找包含依赖项的 JAR,然后复制其链接。
- 对于参数,如果您使用来自 GitHub 的完整 PySpark 脚本,请输入
--project_number=
PROJECT_NUMBER、--location=
PUBSUBLITE_LOCATION、--subscription_id=
SUBSCRIPTION_ID;如果您复制上面的 PySpark 脚本并完成待办事项,请将其留空。 - 在属性下,输入键
spark.master
和值yarn
。 - 点击提交。
gcloud
再次使用 gcloud dataproc jobs submit pyspark 命令将作业提交到 Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
如果 readStream
操作成功,您应该会在本地以及 Google Cloud 控制台的作业详细信息页面上看到如下所示的日志消息:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
重放和完全清除 Pub/Sub Lite 中的消息
使用 Pub/Sub Lite Spark 连接器从 Pub/Sub Lite 读取数据时,跳转操作不起作用,因为 Apache Spark 系统会自行跟踪分区中的偏移量。临时解决方法是排空、寻找并重启工作流。
清理
为避免因本页中使用的资源导致您的 Google Cloud 帐号产生费用,请按照以下步骤操作。
删除主题和订阅。
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
删除 Dataproc 集群。
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
移除 Cloud Storage 存储桶。
gsutil rb gs://$BUCKET
后续步骤
查看 Pub/Sub 精简版 Spark 连接器的 Java 版字数统计示例。
了解如何访问 Dataproc 作业驱动程序输出。
Google Cloud 产品的其他 Spark 连接器:BigQuery 连接器、Bigtable 连接器、Cloud Storage 连接器。