使用 Apache Spark 写入 Pub/Sub Lite 消息
Pub/Sub Lite Spark 连接器是一个开源 Java 客户端库,支持使用 Pub/Sub Lite 作为 Apache Spark Structured Streaming 的输入和输出源。该连接器适用于所有 Apache Spark 发行版,包括 Dataproc。
本快速入门向您展示了如何执行以下操作:
- 从 Pub/Sub 精简版读取消息
- 向 Pub/Sub 精简版写入消息
使用 Dataproc Spark 集群中的 PySpark。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
设置
为您的项目创建变量。
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
创建 Cloud Storage 存储分区。 Cloud Storage 存储分区名称必须是全局唯一的。
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
在受支持的位置创建 Pub/Sub 精简版主题和订阅。如果您使用的是 Pub/Sub 精简版预留,请参阅创建主题。
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
创建 Dataproc 集群。
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
:受支持的 Dataproc 区域,您的 Pub/Sub 精简版主题和订阅位于该区域。--image-version
:集群的映像版本,用于确定集群上安装的 Apache Spark 版本。选择 2.x.x 映像发布版本,因为 Pub/Sub 精简版 Spark 连接器目前支持 Apache Spark 3.x.x。--scopes
:启用对同一项目中的 Google Cloud 服务的 API 访问权限。--enable-component-gateway
:启用对 Apache Spark 网页界面的访问权限。--bucket
:用于存储集群作业依赖项、驱动程序输出和集群配置文件的暂存 Cloud Storage 存储桶。
克隆快速入门代码库并导航到示例代码目录:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
写入 Pub/Sub 精简版
以下示例将执行以下操作:
- 创建一个速率来源,用于生成格式为
spark.sql.Row
的连续数字和时间戳。 - 使用 Pub/Sub 精简版 Spark 连接器的
writeStream
API 转换数据以匹配所需的表架构 - 将数据写入现有 Pub/Sub 精简版主题
要将写入作业提交到 Dataproc,请执行以下操作:
控制台
- 将 PySpark 脚本上传到您的 Cloud Storage 存储桶。
- 转到 Cloud Storage 控制台。
- 选择您的存储桶。
- 使用上传文件来上传您要使用的 PySpark 脚本。
- 将作业提交到 Dataproc 集群:
- 转到 Dataproc 控制台。
- 导航到作业。
- 点击提交作业。
- 填写作业详细信息。
- 在集群下,选择您的集群。
- 在作业下,为作业 ID 指定名称。
- 对于作业类型,选择 PySpark。
- 对于主 Python 文件,请提供已上传 PySpark 脚本的 gcloud Storage URI,以
gs://
开头。 - 对于 Jar 文件,请从 Maven 中选择最新的 Spark 连接器版本,在下载选项中查找包含依赖项的 jar,然后复制其链接。
- 对于参数,如果您使用来自 GitHub 的完整 PySpark 脚本,请输入
--project_number=
PROJECT_NUMBER、--location=
PUBSUBLITE_LOCATION、--topic_id=
TOPIC_ID;如果您复制上面的 PySpark 脚本并完成待办事项,请将其留空。 - 在属性下,输入键
spark.master
和值yarn
。 - 点击提交。
gcloud
使用 gcloud dataproc jobs submit pyspark 命令将作业提交到 Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
如果 writeStream
操作成功,您应该会在本地以及 Google Cloud 控制台中的作业详情页面中看到如下日志消息:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
从 Pub/Sub 精简版读取
以下示例将使用 readStream
API 从现有 Pub/Sub 精简版订阅中读取消息。连接器将输出符合格式为 spark.sql.Row
的固定表架构的消息。
要将读取作业提交到 Dataproc,请执行以下操作:
控制台
- 将 PySpark 脚本上传到您的 Cloud Storage 存储桶。
- 转到 Cloud Storage 控制台。
- 选择您的存储桶。
- 使用上传文件来上传您要使用的 PySpark 脚本。
- 将作业提交到 Dataproc 集群:
- 转到 Dataproc 控制台。
- 导航到作业。
- 点击提交作业。
- 填写作业详细信息。
- 在集群下,选择您的集群。
- 在作业下,为作业 ID 指定名称。
- 对于作业类型,选择 PySpark。
- 对于主 Python 文件,请提供已上传 PySpark 脚本的 gcloud Storage URI,以
gs://
开头。 - 对于 Jar 文件,请从 Maven 中选择最新的 Spark 连接器版本,在下载选项中查找包含依赖项的 jar,然后复制其链接。
- 对于参数,如果您使用来自 GitHub 的完整 PySpark 脚本,请输入
--project_number=
PROJECT_NUMBER、--location=
PUBSUBLITE_LOCATION、--subscription_id=
SUBSCRIPTION_ID;如果您复制上面的 PySpark 脚本并完成待办事项,请将其留空。 - 在属性下,输入键
spark.master
和值yarn
。 - 点击提交。
gcloud
再次使用 gcloud dataproc jobs submit pyspark 命令将作业提交到 Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
如果 readStream
操作成功,您应该会在本地以及 Google Cloud 控制台中的作业详情页面中看到如下日志消息:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
重放和清除 Pub/Sub 精简版消息
使用 Pub/Sub Lite Spark 连接器从 Pub/Sub Lite 读取时,跳转操作不起作用,因为 Apache Spark 系统会自行跟踪分区内偏移。解决方法是排空、还原和重启工作流。
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
删除主题和订阅。
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
删除 Dataproc 集群。
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
移除 Cloud Storage 存储分区。
gcloud storage rm gs://$BUCKET
后续步骤
查看 Pub/Sub 精简版 Spark 连接器的 Java 版字数统计示例。
了解如何访问 Dataproc 作业驱动程序输出。
其他 Google Cloud 产品的 Spark 连接器:BigQuery 连接器、Bigtable 连接器、Cloud Storage 连接器。