Apache Kafka to Cloud Storage 模板是一种流处理流水线,可从 Google Cloud Managed Service for Apache Kafka 注入文本数据,并将记录输出到 Cloud Storage。
您还可以将 Apache Kafka to BigQuery 模板与自行管理的 Kafka 或外部 Kafka 搭配使用。
流水线要求
- Cloud Storage 输出存储桶必须存在。
- Apache Kafka 代理服务器必须正在运行并可从 Dataflow 工作器机器进行访问。
- Apache Kafka 主题必须已存在。
Kafka 消息格式
Apache Kafka to Cloud Storage 模板支持以如下格式从 Kafka 读取消息:CONFLUENT_AVRO_WIRE_FORMAT
和 JSON
。
输出文件格式
输出文件格式与输入 Kafka 消息的格式相同。例如,如果您为 Kafka 消息格式选择 JSON,系统会将 JSON 文件写入输出 Cloud Storage 存储桶。
身份验证
Apache Kafka to Cloud Storage 模板支持对 Kafka 代理进行 SASL/PLAIN 身份验证。
模板参数
必需参数
- readBootstrapServerAndTopic:要从中读取输入的 Kafka 主题。
- kafkaReadAuthenticationMode:与 Kafka 集群搭配使用的身份验证模式。如果不进行身份验证,请使用 NONE;如果使用 SASL/PLAIN 用户名和密码,请使用 SASL_PLAIN。Apache Kafka for BigQuery 仅支持 SASL_PLAIN 身份验证模式。默认值为:SASL_PLAIN。
- outputDirectory:用于写入输出文件的路径和文件名前缀。必须以斜杠结尾。 (示例:gs://your-bucket/your-path/)。
- messageFormat:要读取的 Kafka 消息的格式。支持的值包括 AVRO_CONFLUENT_WIRE_FORMAT(Confluent 架构注册表编码的 Avro)、AVRO_BINARY_ENCODING(普通二进制 Avro)和 JSON。默认值为:AVRO_CONFLUENT_WIRE_FORMAT。
可选参数
- windowDuration:将数据写入 Cloud Storage 的窗口时长/大小。允许的格式为 Ns(以秒为单位,例如 5s)、Nm(以分钟为单位,例如 12m)、Nh(以小时为单位,例如 2h)。 (示例:5m)。 默认值为 5m。
- outputFilenamePrefix:要在各窗口文件上放置的前缀。(示例:output-)。默认值为:output。
- numShards:写入时生成的输出分片数上限。分片数越多,写入 Cloud Storage 的吞吐量越高,但处理输出 Cloud Storage 文件时跨分片聚合数据的费用也可能更高。默认值由 Dataflow 决定。
- enableCommitOffsets:将已处理消息的偏移量提交到 Kafka。如果启用此参数,则在重启流水线时,消息处理的间隔或重复处理会降到最低。需要指定使用方群组 ID。默认值为:false。
- consumerGroupId:此流水线所属的使用方群组的唯一标识符。如果已启用“将偏移量提交到 Kafka”,则必须使用此参数。默认值为空。
- kafkaReadOffset:在没有提交偏移量的情况下读取消息的起点。最早的从最开始算起,最新的从最新消息算起。默认值为最新。
- kafkaReadUsernameSecretId:Google Cloud Secret Manager 密钥 ID,其中包含要与 SASL_PLAIN 身份验证搭配使用的 Kafka 用户名。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。默认为空。
- kafkaReadPasswordSecretId:Google Cloud Secret Manager 密钥 ID,其中包含要与 SASL_PLAIN 身份验证搭配使用的 Kafka 密码。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。默认为空。
- schemaFormat:Kafka 架构格式。可作为 SINGLE_SCHEMA_FILE 或 SCHEMA_REGISTRY 提供。如果指定了 SINGLE_SCHEMA_FILE,则所有消息都应具有 avro 架构文件中提及的架构。如果指定了 SCHEMA_REGISTRY,消息可以具有单个架构或多个架构。默认值为 SINGLE_SCHEMA_FILE。
- confluentAvroSchemaPath:用于解码主题中所有消息的单个 Avro 架构文件的 Google Cloud Storage 路径。默认值为空。
- schemaRegistryConnectionUrl:用于管理 Avro 架构以进行消息解码的 Confluent 架构注册表实例的网址。默认值为空。
- binaryAvroSchemaPath:用于解码二进制编码 Avro 消息的 Avro 架构文件的 Google Cloud Storage 路径。默认值为空。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Kafka to Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
BIGQUERY_TABLE
:您的 Cloud Storage 表名称KAFKA_TOPICS
:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅gcloud topic escaping
。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。KAFKA_SERVER_ADDRESSES
:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092
。如果提供了多个地址,您需要转义英文逗号。请参阅gcloud topic escaping
。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
BIGQUERY_TABLE
:您的 Cloud Storage 表名称KAFKA_TOPICS
:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅gcloud topic escaping
。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。KAFKA_SERVER_ADDRESSES
:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092
。如果提供了多个地址,您需要转义英文逗号。请参阅gcloud topic escaping
。
如需了解详情,请参阅使用 Dataflow 将数据从 Kafka 写入 Cloud Storage。
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。