Apache Kafka 适用于实时数据的开源分布式流式处理平台 流水线和数据集成它提供了一个高效且可伸缩的流式传输系统 可用于各种应用,包括:
- 实时分析
- 流处理
- 日志汇总
- 分布式消息传递
- 事件流式传输
目标
在 Dataproc 高可用性集群 。
创建虚构的客户数据,然后将数据发布到 Kafka 主题。
在 Cloud Storage 中创建 Hive Parquet 和 ORC 表,以接收流式传输的 Kafka 主题数据。
提交 PySpark 作业以订阅 Kafka 主题并将其流式传输到 采用 Parquet 和 ORC 格式的 Cloud Storage。
对流式插入的 Hive 表数据运行查询,以对流式插入的数据进行计数 Kafka 消息。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
如果您尚未创建 Google Cloud 项目,请先创建一个。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
教程步骤
执行以下步骤以创建 Dataproc Kafka 集群 以 Parquet OR ORC 格式将 Kafka 主题读取到 Cloud Storage 中。
将 Kafka 安装脚本复制到 Cloud Storage
kafka.sh
初始化操作
脚本在 Dataproc 集群上安装 Kafka。
浏览代码。
复制
kafka.sh
初始化操作 复制到您的 Cloud Storage 存储桶 此脚本会在 Dataproc 集群上安装 Kafka。打开 Cloud Shell,然后运行 以下命令:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
进行以下替换:
- REGION:
kafka.sh
存储在带有区域标记的公开中 Cloud Storage 中的存储分区。指定地理位置相近 Compute Engine 区域、 (例如:us-central1
)。 - BUCKET_NAME - Cloud Storage 存储桶的名称。
- REGION:
创建 Dataproc Kafka 集群
打开 Cloud Shell,然后运行 以下
gcloud dataproc clusters create
用于创建 Dataproc 高可用性集群 安装 Kafka 和 ZooKeeper 组件的集群:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
注意:
- KAFKA_CLUSTER:集群名称,该名称在项目中必须是唯一的。 名称必须以小写字母开头,最多可以包含 51 个小写字母 字母、数字和连字符。但不能以连字符结尾。 可以重复使用的集群
- PROJECT_ID:要与此集群关联的项目。
- REGION:
Compute Engine 区域
集群所在的位置,例如
us-central1
。- 您可以添加可选的
--zone=ZONE
标志 以指定区域内的可用区 例如us-central1-a
。如果您未指定区域, Dataproc 自动选择地区 功能选择具有指定区域的可用区。
- 您可以添加可选的
--image-version
:Dataproc 映像版本2.1-debian11
本教程的两个示例 注意:每个映像版本都包含一组预安装的 包括本演示文稿中使用的 Hive 组件 教程(请参阅 支持的 Dataproc 映像版本)。--num-master
:3
个主节点 高可用性集群。 已预安装 Kafka 所需的 ZooKeeper 组件 高可用性集群上--enable-component-gateway
:启用 Dataproc 组件网关。- BUCKET_NAME:Cloud Storage 存储桶的名称
包含
/scripts/kafka.sh
初始化脚本 (请参阅将 Kafka 安装脚本复制到 Cloud Storage)。
创建 Kafka custdata
主题
如需在 Dataproc Kafka 集群上创建 Kafka 主题,请执行以下操作:
使用 SSH 实用程序打开终端窗口。
创建 Kafka
custdata
主题。/usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
注意:
KAFKA_CLUSTER:插入 Kafka 集群的名称。
-w-0:9092
表示 Kafka Broker 在worker-0
节点上的端口9092
。您可以在创建
custdata
主题后运行以下命令:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
向 Kafka custdata
主题发布内容
以下脚本使用 kafka-console-producer.sh
Kafka 工具
以 CSV 格式生成虚构的客户数据。
复制该脚本,然后将其粘贴到 SSH 客户端中 终端上。新闻媒体 <return> 用于运行脚本。
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
注意:
- KAFKA_CLUSTER:您的 Kafka 集群的名称。
运行以下 Kafka 命令以确认
custdata
主题包含 10,000 封邮件。/usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
注意:
- KAFKA_CLUSTER:您的 Kafka 集群的名称。
预期输出:
custdata:0:10000
在 Cloud Storage 中创建 Hive 表
创建 Hive 表以接收流式 Kafka 主题数据。
执行以下步骤以创建 cust_parquet
(parquet) 和
Cloud Storage 存储桶中的 cust_orc
(ORC) Hive 表。
在以下脚本中插入 BUCKET_NAME。 然后将脚本复制并粘贴到 Kafka 集群主服务器节点上的 SSH 终端中, 然后按 <return> 创建一个
~/hivetables.hql
(Hive 查询语言)脚本。您将运行
~/hivetables.hql
脚本 创建 Parquet 和 ORC Hive 表 存储在 Cloud Storage 存储桶中cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
在 Kafka 集群的主节点的 SSH 终端中,提交
~/hivetables.hql
Hive 作业,以便在 Cloud Storage 存储桶中创建cust_parquet
(parquet) 和cust_orc
(ORC) Hive 表。gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
注意:
- Hive 组件已预安装在 Dataproc Kafka 上 集群。如需查看最近发布的 2.1 映像中包含的 Hive 组件版本列表,请参阅 2.1.x 版本。
- KAFKA_CLUSTER:您的 Kafka 集群的名称。
- REGION:Kafka 集群所在的区域。
将 Kafka custdata
流式传输到 Hive 表
- 在主实例节点上的 SSH 终端中运行以下命令:
以安装
kafka-python
库。 需要 Kafka 客户端才能将 Kafka 主题数据流式传输到 Cloud Storage
pip install kafka-python
插入 BUCKET_NAME,然后复制并将以下 PySpark 代码粘贴到 Kafka 集群主节点上的 SSH 终端,然后按 <return> 创建
streamdata.py
文件。该脚本会订阅 Kafka
custdata
主题,然后将数据流式传输到 Cloud Storage 中的 Hive 表。输出格式 (可以是 Parquet 或 ORC)作为 参数。cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
在主实例节点上的 SSH 终端中, 您的 Kafka 集群,请运行
spark-submit
以将数据流式传输到 Cloud Storage 中的 Hive 表。插入 KAFKA_CLUSTER 的名称和输出 FORMAT,然后复制以下代码并将其粘贴到 SSH 中 终端,然后按 <return> 运行代码,并将 Parquet 格式的 Kafka
custdata
数据流式传输到 Cloud Storage 中的 Hive 表。spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
注意:
- KAFKA_CLUSTER:插入 Kafka 集群的名称。
- FORMAT:指定
parquet
或orc
作为 输出格式。您可以连续运行该命令,将这两种格式的数据流式传输到 Hive 表:例如,在第一次调用中,指定parquet
以将 Kafkacustdata
主题流式传输到 Hive parquet 表;然后,在第二次调用中,指定orc
格式以将custdata
流式传输到 Hive ORC 表。
当标准输出在 SSH 终端中停止后,这表示 已流式传输所有
custdata
,请按 在 SSH 终端中使用 <control-c> 停止该进程。列出 Cloud Storage 中的 Hive 表。
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
注意:
- BUCKET_NAME:插入 Cloud Storage 的名称 包含 Hive 表的存储桶(请参阅创建 Hive 表)。
查询流式传输的数据
在主实例节点上的 SSH 终端中, 您的 Kafka 集群,请运行以下
hive
命令 统计流式传输的 Kafkacustdata
消息 存储在 Cloud Storage 中的 Hive 表中hive -e "select count(1) from TABLE_NAME"
注意:
- TABLE_NAME:指定
cust_parquet
或cust_orc
作为 Hive 表名称。
预期输出代码段:
- TABLE_NAME:指定
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
清理
删除项目
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
删除资源
-
删除存储分区:
gcloud storage buckets delete BUCKET_NAME
- 删除您的 Kafka 集群:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}