Apache Kafka 是一个开源分布式流式平台,适用于实时数据流水线和数据集成。它提供了一个高效且可伸缩的流式传输系统,可用于各种应用,包括:
- 实时分析
- 流处理
- 日志汇总
- 分布式消息传递
- 事件流式传输
目标
在包含 ZooKeeper 的 Dataproc 高可用性集群(在本教程中称为“Dataproc Kafka 集群”)上安装 Kafka。
创建虚构的客户数据,然后将数据发布到 Kafka 主题。
在 Cloud Storage 中创建 Hive Parquet 和 ORC 表,以接收流式传输的 Kafka 主题数据。
提交 PySpark 作业,以订阅 Kafka 主题并将其以 Parquet 和 ORC 格式流式传输到 Cloud Storage。
对流式传输的 Hive 表数据运行查询,以统计流式传输的 Kafka 消息数。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
如果您尚未创建 Google Cloud 项目,请先创建一个。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
教程步骤
请按以下步骤创建 Dataproc Kafka 集群,以便将 Kafka 主题以 Parquet 或 ORC 格式读取到 Cloud Storage。
将 Kafka 安装脚本复制到 Cloud Storage
kafka.sh
初始化操作脚本会在 Dataproc 集群上安装 Kafka。
浏览代码。
将
kafka.sh
初始化操作脚本复制到您的 Cloud Storage 存储桶。此脚本会在 Dataproc 集群上安装 Kafka。打开 Cloud Shell,然后运行以下命令:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
进行以下替换:
- REGION:
kafka.sh
存储在 Cloud Storage 中带有地区标记的公共存储分区中。指定地理位置上靠近的 Compute Engine 区域(例如us-central1
)。 - BUCKET_NAME - Cloud Storage 存储桶的名称。
- REGION:
创建 Dataproc Kafka 集群
打开 Cloud Shell,然后运行以下
gcloud dataproc clusters create
命令,创建一个会安装 Kafka 和 ZooKeeper 组件的 Dataproc 高可用性集群:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
注意:
- KAFKA_CLUSTER:集群名称,在项目中必须是唯一的。名称必须以小写字母开头,最多可包含 51 个小写字母、数字和连字符。不得以连字符结尾。已删除的集群的名称可以重复使用。
- PROJECT_ID:要与此集群关联的项目。
- REGION:集群所在的 Compute Engine 区域,例如
us-central1
。- 您可以添加可选的
--zone=ZONE
标志,以指定指定区域内的可用区,例如us-central1-a
。如果您未指定可用区,Dataproc 自动选择可用区展示位置功能会选择包含指定区域的可用区。
- 您可以添加可选的
--image-version
:建议使用 Dataproc 映像版本2.1-debian11
学习本教程。注意:每个映像版本都包含一组预安装的组件,包括本教程中使用的 Hive 组件(请参阅支持的 Dataproc 映像版本)。--num-master
:3
主节点创建一个高可用性集群。Kafka 所需的 ZooKeeper 组件已预安装在高可用性集群上。--enable-component-gateway
:启用 Dataproc 组件网关。- BUCKET_NAME:包含
/scripts/kafka.sh
初始化脚本的 Cloud Storage 存储桶的名称(请参阅将 Kafka 安装脚本复制到 Cloud Storage)。
创建 Kafka custdata
主题
如需在 Dataproc Kafka 集群上创建 Kafka 主题,请执行以下操作:
使用 SSH 实用程序在集群主服务器虚拟机上打开一个终端窗口。
创建 Kafka
custdata
主题。/usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
注意:
KAFKA_CLUSTER:插入 Kafka 集群的名称。
-w-0:9092
表示在worker-0
节点的端口9092
上运行的 Kafka 代理。创建
custdata
主题后,您可以运行以下命令:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
将内容发布到 Kafka custdata
主题
以下脚本使用 kafka-console-producer.sh
Kafka 工具生成 CSV 格式的虚构客户数据。
复制该脚本,然后将其粘贴到 Kafka 集群主节点上的 SSH 终端中。按 <return> 运行脚本。
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
注意:
- KAFKA_CLUSTER:您的 Kafka 集群的名称。
运行以下 Kafka 命令,确认
custdata
主题包含 10,000 条消息。/usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
注意:
- KAFKA_CLUSTER:您的 Kafka 集群的名称。
预期输出:
custdata:0:10000
在 Cloud Storage 中创建 Hive 表
创建 Hive 表以接收流式传输的 Kafka 主题数据。请按以下步骤在 Cloud Storage 存储桶中创建 cust_parquet
(parquet) 和 cust_orc
(ORC) Hive 表。
在以下脚本中插入您的 BUCKET_NAME,然后将该脚本复制并粘贴到 Kafka 集群主服务器节点上的 SSH 终端,然后按 <return> 创建
~/hivetables.hql
(Hive 查询语言)脚本。您将在下一步中运行
~/hivetables.hql
脚本,以便在 Cloud Storage 存储桶中创建 Parquet 和 ORC Hive 表。cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
在 Kafka 集群的主节点的 SSH 终端中,提交
~/hivetables.hql
Hive 作业,以便在 Cloud Storage 存储桶中创建cust_parquet
(parquet) 和cust_orc
(ORC) Hive 表。gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
注意:
- Hive 组件已预安装在 Dataproc Kafka 集群上。如需查看最近发布的 2.1 映像中包含的 Hive 组件版本列表,请参阅 2.1.x 版本。
- KAFKA_CLUSTER:您的 Kafka 集群的名称。
- REGION:Kafka 集群所在的区域。
将 Kafka custdata
流式传输到 Hive 表
- 在 Kafka 集群的主节点的 SSH 终端中,运行以下命令以安装
kafka-python
库。您需要 Kafka 客户端才能将 Kafka 主题数据流式传输到 Cloud Storage。
pip install kafka-python
插入 BUCKET_NAME,然后复制并将以下 PySpark 代码粘贴到 Kafka 集群主服务器节点上的 SSH 终端,然后按 <return> 创建
streamdata.py
文件。该脚本会订阅 Kafka
custdata
主题,然后将数据流式传输到 Cloud Storage 中的 Hive 表。输出格式(可以是 parquet 或 ORC)会作为参数传入脚本。cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
在 Kafka 集群的主节点的 SSH 终端中,运行
spark-submit
以将数据流式传输到 Cloud Storage 中的 Hive 表。插入 KAFKA_CLUSTER 的名称和输出 FORMAT,然后将以下代码复制并粘贴到 Kafka 集群主节点上的 SSH 终端,然后按 <return> 运行代码,并将 Kafka
custdata
数据以 Parquet 格式流式传输到 Cloud Storage 中的 Hive 表。spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
注意:
- KAFKA_CLUSTER:插入 Kafka 集群的名称。
- FORMAT:将
parquet
或orc
指定为输出格式。您可以连续运行该命令,将这两种格式的数据流式传输到 Hive 表:例如,在第一次调用中,指定parquet
以将 Kafkacustdata
主题流式传输到 Hive parquet 表;然后,在第二次调用中,指定orc
格式以将custdata
流式传输到 Hive ORC 表。
当 SSH 终端中的标准输出停止(表示所有
custdata
均已流式传输)后,请在 SSH 终端中按 <control-c> 停止该进程。列出 Cloud Storage 中的 Hive 表。
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
注意:
- BUCKET_NAME:插入包含 Hive 表的 Cloud Storage 存储桶的名称(请参阅创建 Hive 表)。
查询流式传输的数据
在 Kafka 集群的主节点上的 SSH 终端中,运行以下
hive
命令,以统计 Cloud Storage 中的 Hive 表中流式传输的 Kafkacustdata
消息的数量。hive -e "select count(1) from TABLE_NAME"
注意:
- TABLE_NAME:将
cust_parquet
或cust_orc
指定为 Hive 表名称。
预期输出代码段:
- TABLE_NAME:将
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
清理
删除项目
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
删除资源
-
删除存储分区:
gcloud storage buckets delete BUCKET_NAME
- 删除 Kafka 集群:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}