本文档简要介绍如何创建和部署 Dataflow 流水线来将数据从 Apache Kafka 流式传输到 BigQuery。
Apache Kafka 是一个用于流式传输事件的开源平台。Kafka 通常用于分布式架构,可在松散耦合的各组件之间实现通信。您可以使用 Dataflow 从 Kafka 读取事件、处理事件,并将结果写入 BigQuery 表以供进一步分析。
Google 提供了一个 Dataflow 模板,可用于配置 Kafka 到 BigQuery 的流水线。该模板使用 Apache Beam SDK 中提供的 BigQueryIO 连接器。
如需使用此模板,请执行以下步骤:
- 在 Google Cloud 或其他环境中部署 Kafka。
- 配置网络。
- 设置 Identity and Access Management (IAM) 权限。
- 编写用来转换事件数据的函数。
- 创建 BigQuery 输出表。
- 部署 Dataflow 模板。
部署 Kafka
在 Google Cloud 中,您可以在 Compute Engine 虚拟机实例上部署 Kafka 集群,也可以使用第三方代管的 Kafka 服务。如需详细了解 Google Cloud 上的部署选项,请参阅什么是 Apache Kafka?。您可以在 Google Cloud Marketplace 上找到第三方 Kafka 解决方案。
或者,您也可以使用自己位于 Google Cloud 外部的现有 Kafka 集群。例如,您可能有一个部署在本地或部署在其他公有云中的工作负载。
配置网络
默认情况下,Dataflow 会在您的默认 Virtual Private Cloud (VPC) 网络中启动实例。根据您的 Kafka 配置,您可能需要为 Dataflow 配置不同的网络和子网。如需了解详情,请参阅指定网络和子网。 配置网络时,请创建防火墙规则,允许 Dataflow 工作器机器访问 Kafka 代理。
如果您在使用 VPC Service Controls,请将 Kafka 集群放在 VPC Service Controls 边界内,或者将边界扩展到授权 VPN 或 Cloud Interconnect 范围。
如果您的 Kafka 集群部署在 Google Cloud 之外,您必须在 Dataflow 和 Kafka 集群之间创建网络连接。有几种网络选项可供您选择,它们各有利弊:
- 使用共享 RFC 1918 地址空间进行连接,方法是使用以下任一方法:
- 通过以下任一方法通过公共 IP 地址访问外部代管的 Kafka 集群:
专用互连是实现可预测性能和可靠性的最佳选择,但由于第三方必须预配新线路,因此设置专用互连可能需要更长时间。使用基于公共 IP 的拓扑,您可以快速开始,因为只需完成很少的网络操作。
接下来的两个部分更详细地介绍了这些选项。
共享的 RFC 1918 地址空间
使用专用互连和 IPsec VPN,均可以直接访问虚拟私有云 (VPC) 中的 RFC 1918 IP 地址,从而简化 Kafka 配置。如果您使用基于 VPN 的拓扑,请考虑设置高吞吐量 VPN。
默认情况下,Dataflow 会在您的默认 VPC 网络中启动实例。在专用网络拓扑中(其中的路由已在 Cloud Router 中明确定义并将 Google Cloud 中的子网连接到该 Kafka 集群),您需要更好地控制 Dataflow 实例的放置位置。您可以使用 Dataflow 配置 network
和 subnetwork
执行参数。
确保相应子网具有足够的 IP 地址,以便 Dataflow 在尝试横向扩容时可在这些地址上启动实例。此外,在创建用于启动 Dataflow 实例的单独网络时,请确保您具有可在项目中的所有虚拟机之间启用 TCP 流量的防火墙规则。默认网络已配置此防火墙规则。
公共 IP 地址空间
此架构使用传输层安全协议 (TLS) 来保护外部客户端与 Kafka 之间的流量,并使用未加密的流量进行代理间通信。当 Kafka 侦听器绑定到用于内部和外部通信的网络接口时,配置侦听器就非常简单了。但在许多情况下,集群中 Kafka broker 的外部通告地址将与 Kafka 使用的内部网络接口不同。在这种情况下,您可以使用 advertised.listeners
属性:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
外部客户端使用端口 9093 通过“SSL”通道连接,内部客户端使用端口 9092 通过明文通道连接。在 advertised.listeners
下指定地址时,请使用解析为外部和内部流量的相同实例的 DNS 名称(在此示例中为 kafkabroker-n.mydomain.com
)。使用公共 IP 地址可能不起作用,因为该地址可能无法解析内部流量。
设置 IAM 权限
Dataflow 作业使用两个 IAM 服务账号:
- Dataflow 服务使用 Dataflow 服务账号来处理 Google Cloud 资源,例如创建虚拟机。
- Dataflow 工作器虚拟机使用工作器服务账号来访问流水线的文件和其他资源。此服务账号需要对 BigQuery 输出表拥有写入权限;还需要对流水线作业引用的所有其他资源拥有相应的访问权限。
确保这两个服务账号均具有适当的角色。如需了解详情,请参阅 Dataflow 安全性和权限。
为 BigQuery 转换数据
Kafka-to-BigQuery 模板会创建一个流水线,来从一个或多个 Kafka 主题读取事件并将其写入 BigQuery 表中。您也可以视需要提供一个 JavaScript 用户定义函数 (UDF),在将事件数据写入 BigQuery 之前先用该函数转换数据。
流水线的输出必须是与输出表架构匹配的 JSON 格式数据。如果 Kafka 事件数据已经采用 JSON 格式,您可以创建具有匹配架构的 BigQuery 表,并将事件直接传递给 BigQuery。否则,您需要编写一个 UDF,接受事件数据作为输入并返回与 BigQuery 表匹配的 JSON 数据。
例如,假设事件数据包含两个字段:
name
(字符串)customer_id
(整数)
那么,Dataflow 流水线的输出可能如下所示:
{ "name": "Alice", "customer_id": 1234 }
如果事件数据当前尚未采用 JSON 格式,您可以编写一个如下所示的 UDF 来转换数据:
// UDF
function process(eventData) {
var name;
var customer_id;
// TODO Parse the event data to extract the name and customer_id fields.
// Return a JSON payload.
return JSON.stringify({ name: name, customer_id: customer_id });
}
UDF 可以对事件数据执行额外的处理,例如过滤事件、去除个人身份信息 (PII),或使用更多字段丰富数据。
如需详细了解如何为模板编写 UDF,请参阅使用 UDF 扩展 Dataflow 模板。将 JavaScript 文件上传到 Cloud Storage。
创建 BigQuery 输出表
在运行模板之前,您需要先创建 BigQuery 输出表。表架构必须与流水线的 JSON 输出兼容。对于 JSON 载荷中的每个属性,流水线都会将值写入 BigQuery 表中的同名列。JSON 中的任何缺失属性都会被解释为 NULL 值。
使用前面的示例,BigQuery 表将包含以下列:
列名 | 数据类型 |
---|---|
name |
STRING |
customer_id |
INTEGER |
您可以使用 CREATE TABLE
SQL 语句创建表:
CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);
或者,您也可以使用 JSON 定义文件指定表架构。如需了解详情,请参阅 BigQuery 文档中的指定架构部分。
运行 Dataflow 作业
创建 BigQuery 表后,即可运行 Dataflow 模板。
控制台
如需使用 Google Cloud 控制台创建 Dataflow 作业,请执行以下步骤:
- 在 Google Cloud 控制台中,转到 Dataflow 页面。
- 点击基于模板创建作业。
- 在作业名称字段中,输入作业名称。
- 对于区域端点,选择一个区域。
- 选择“Kafka to BigQuery”模板。
- 在必需参数下,输入 BigQuery 输出表的名称。该表必须已存在且具有有效的架构。
点击显示可选参数,然后至少输入以下参数的值:
- 要从中读取输入的 Kafka 主题。
- 以英文逗号分隔的 Kafka 引导服务器列表。
- 服务账号电子邮件地址。
根据需要输入其他参数。特别是,您可能需要指定以下内容:
- 网络:如需使用非默认网络的 VPC 网络,请指定网络和子网。
- UDF:如需使用 JavaScript UDF,请指定脚本的 Cloud Storage 位置以及要调用的 JavaScript 函数的名称。
gcloud
如需使用 Google Cloud CLI 创建 Dataflow 作业,请运行以下命令:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters inputTopics=KAFKA_TOPICS \ --parameters bootstrapServers=BOOTSTRAP_SERVERS \ --parameters outputTableSpec=OUTPUT_TABLE \ --parameters serviceAccount=IAM_SERVICE_ACCOUNT \ --parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \ --parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \ --network VPC_NETWORK_NAME \ --subnetwork SUBNET_NAME
执行以下变量替换操作:
- JOB_NAME:您选择的作业名称。
- LOCATION:要在其中运行作业的区域。如需详细了解区域和位置,请参阅 Dataflow 位置。
- KAFKA_TOPICS:要读取的 Kafka 主题的英文逗号分隔列表。
- BOOTSTRAP_SERVERS:Kafka 引导服务器的英文逗号分隔列表。示例:
127:9092,127.0.0.1:9093
。 - OUTPUT_TABLE:BigQuery 输出表,按如下格式指定:PROJECT_ID:DATASET_NAME.TABLE_NAME。
示例:
my_project:dataset1.table1
。 - IAM_SERVICE_ACCOUNT。可选。运行作业所用的服务账号的电子邮件地址。
- UDF_SCRIPT_PATH。可选。UDF 所在 JavaScript 文件的 Cloud Storage 路径。示例:
gs://your-bucket/your-function.js
。 - UDF_FUNCTION_NAME。可选。要作为 UDF 进行调用的 JavaScript 函数的名称。
- VPC_NETWORK_NAME。可选。要将工作器分配到的网络。
- SUBNET_NAME。可选。要将工作器分配到的子网。
数据类型
本部分介绍如何处理 BigQuery 表架构中的各种数据类型。
在内部,JSON 消息会转换为 TableRow
对象,TableRow
字段值继而会转换为 BigQuery 类型。
标量类型
以下示例会创建包含各种标量数据类型(包括字符串、数值、布尔值、日期/时间、间隔和地理位置类型)的 BigQuery 表:
CREATE TABLE my_dataset.kafka_events (
string_col STRING,
integer_col INT64,
float_col FLOAT64,
decimal_col DECIMAL,
bool_col BOOL,
date_col DATE,
dt_col DATETIME,
ts_col TIMESTAMP,
interval_col INTERVAL,
geo_col GEOGRAPHY
);
下面则是包含兼容字段的 JSON 载荷:
{
"string_col": "string_val",
"integer_col": 10,
"float_col": 3.142,
"decimal_col": 5.2E11,
"bool_col": true,
"date_col": "2022-07-01",
"dt_col": "2022-07-01 12:00:00.00",
"ts_col": "2022-07-01T12:00:00.00Z",
"interval_col": "0-13 370 48:61:61",
"geo_col": "POINT(1 2)"
}
注意:
- 对于
TIMESTAMP
列,您可以使用 JavaScriptDate.toJSON
方法设置值的格式。 - 对于
GEOGRAPHY
列,您可以使用已知文本 (WKT) 或 GeoJSON 格式以字符串形式指定地理位置。如需了解详情,请参阅加载地理空间数据。
如需详细了解 BigQuery 中的数据类型,请参阅数据类型。
数组
您可以使用 ARRAY
数据类型在 BigQuery 中存储数组。在以下示例中,JSON 载荷包含一个名为 scores
的属性,该属性的值是一个 JSON 数组:
{"name":"Emily","scores":[10,7,10,9]}
以下 CREATE TABLE
SQL 语句会创建具有兼容架构的 BigQuery 表:
CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);
生成的表如下所示:
+-------+-------------+
| name | scores |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+
结构
BigQuery 中的 STRUCT
数据类型包含命名字段的有序列表。您可以使用 STRUCT
来存放遵循一致架构的 JSON 对象。
在以下示例中,JSON 载荷包含一个名为 val
的属性,该属性的值是一个 JSON 对象:
{"name":"Emily","val":{"a":"yes","b":"no"}}
以下 CREATE TABLE
SQL 语句会创建具有兼容架构的 BigQuery 表:
CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);
生成的表如下所示:
+-------+----------------------+
| name | val |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
半结构化事件数据
如果 Kafka 事件数据未遵循严格的架构,请考虑将其作为 JSON
数据类型存储在 BigQuery 中(预览版功能)。如果将 JSON 数据存储为 JSON
数据类型,则无需事先定义事件架构。注入数据后,您可以使用字段访问运算符(通过点表示法)和数组访问运算符来查询输出表。
首先,创建一个包含 JSON
列的表:
-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);
然后,定义一个将事件载荷封装在 JSON 对象内的 JavaScript UDF:
// UDF
function process(eventData) {
var json;
// TODO Convert the event data to JSON.
return JSON.stringify({ "event_data": json });
}
在数据写入 BigQuery 后,您便可以使用字段访问运算符来查询各个字段。例如,以下查询返回每条记录的 name
字段的值:
SELECT event_data.name FROM my_dataset1.kafka_events;
如需详细了解如何在 BigQuery 中使用 JSON,请参阅使用 Google 标准 SQL 处理 JSON 数据。
错误和日志记录
在运行流水线或处理个别 Kafka 事件时,可能会出现错误。
如需详细了解如何处理流水线错误,请参阅流水线问题排查和调试。
如果作业成功运行,但在处理个别 Kafka 事件时发生错误,则流水线作业会将一条错误记录写入 BigQuery 中的表。作业本身不会失败,并且该事件级错误也不会在 Dataflow 作业日志中被报告为错误。
流水线作业会自动创建表来存放错误记录。默认情况下,该表的名称为“output_table_error_records”,其中“output_table”是输出表的名称。例如,如果输出表名为 kafka_events
,则错误表名为 kafka_events_error_records
。您可以通过设置 outputDeadletterTable
模板参数来指定其他名称:
outputDeadletterTable=my_project:dataset1.errors_table
可能的错误包括:
- 序列化错误(包括格式错误的 JSON)。
- 类型转换错误(由于表架构与 JSON 数据不匹配导致)。
- JSON 数据中包含的不存在于表架构中的额外字段。
错误消息示例:
错误类型 | 事件数据 | errorMessage |
---|---|---|
序列化错误 | "Hello world" | Failed to serialize json to table row: "Hello world" |
类型转换错误 | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
未知字段 | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
后续步骤
- 详细了解 Dataflow 模板。
- BigQuery 使用入门。