Apache Kafka to Cloud Storage 模板是一种流处理流水线,可从 Google Cloud Managed Service for Apache Kafka 注入文本数据,并将记录输出到 Cloud Storage。
您还可以将 Apache Kafka to BigQuery 模板与自行管理的 Kafka 或外部 Kafka 搭配使用。
流水线要求
- Cloud Storage 输出存储桶必须存在。
- Apache Kafka 代理服务器必须正在运行并可从 Dataflow 工作器机器进行访问。
- Apache Kafka 主题必须已存在。
Kafka 消息格式
Apache Kafka to Cloud Storage 模板支持以如下格式从 Kafka 读取消息:CONFLUENT_AVRO_WIRE_FORMAT
和 JSON
。
输出文件格式
输出文件格式与输入 Kafka 消息的格式相同。例如,如果您为 Kafka 消息格式选择 JSON,系统会将 JSON 文件写入输出 Cloud Storage 存储桶。
身份验证
Apache Kafka to Cloud Storage 模板支持对 Kafka 代理进行 SASL/PLAIN 身份验证。
模板参数
必需参数
- readBootstrapServerAndTopic:要从中读取输入的 Kafka 主题。
- outputDirectory:用于写入输出文件的路径和文件名前缀。必须以斜杠结尾。 例如
gs://your-bucket/your-path/
。 - kafkaReadAuthenticationMode:与 Kafka 集群搭配使用的身份验证模式。如果不进行身份验证,请使用
NONE
;如果使用 SASL/PLAIN 用户名和密码,请使用SASL_PLAIN
;如果使用基于证书的身份验证,请使用TLS
。Apache Kafka for BigQuery 仅支持SASL_PLAIN
身份验证模式。默认值为:SASL_PLAIN。 - messageFormat:要读取的 Kafka 消息的格式。支持的值包括
AVRO_CONFLUENT_WIRE_FORMAT
(Confluent 架构注册表编码的 Avro)、AVRO_BINARY_ENCODING
(普通二进制 Avro)和JSON
。默认值为:AVRO_CONFLUENT_WIRE_FORMAT。 - useBigQueryDLQ:如果为 true,系统会将失败的消息写入 BigQuery,并附带额外的错误信息。默认值为:false。
可选参数
- windowDuration:将数据写入 Cloud Storage 的窗口时长/大小。允许的格式为 Ns(以秒为单位,例如 5s)、Nm(以分钟为单位,例如 12m)、Nh(以小时为单位,例如 2h)。 例如
5m
。默认值为 5m。 - outputFilenamePrefix:要在各窗口文件上放置的前缀。例如
output-
。默认值:output。 - numShards:写入时生成的输出分片数上限。分片数越多,写入 Cloud Storage 的吞吐量越高,但处理输出 Cloud Storage 文件时跨分片聚合数据的费用也可能更高。默认值由 Dataflow 决定。
- enableCommitOffsets:将已处理消息的偏移量提交到 Kafka。如果启用此参数,则在重启流水线时,消息处理的间隔或重复处理会降到最低。需要指定使用方群组 ID。默认值为:false。
- consumerGroupId:此流水线所属的使用方群组的唯一标识符。如果已启用“将偏移量提交到 Kafka”,则必须使用此参数。默认值为空。
- kafkaReadOffset:在没有提交偏移量的情况下读取消息的起点。最早的从最开始算起,最新的从最新消息算起。默认值为:latest。
- kafkaReadUsernameSecretId:Google Cloud Secret Manager Secret ID,其中包含要与
SASL_PLAIN
身份验证搭配使用的 Kafka 用户名。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。默认值为空。 - kafkaReadPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含要与
SASL_PLAIN
身份验证搭配使用的 Kafka 密码。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。默认值为空。 - kafkaReadKeystoreLocation:Java KeyStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含在向 Kafka 集群进行身份验证时使用的 TLS 证书和私钥。例如
gs://your-bucket/keystore.jks
。 - kafkaReadTruststoreLocation:Java TrustStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于验证 Kafka 代理身份的受信任证书。
- kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java TrustStore (JKS) 文件以进行 Kafka TLS 身份验证的密码,例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件以进行 Kafka TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件中的私钥以进行 Kafka TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - schemaFormat:Kafka 架构格式。可以作为
SINGLE_SCHEMA_FILE
或SCHEMA_REGISTRY
提供。如果指定了SINGLE_SCHEMA_FILE
,则对所有消息使用 avro 架构文件中提及的架构。如果指定了SCHEMA_REGISTRY
,消息可以具有单个架构或多个架构。默认值为 SINGLE_SCHEMA_FILE。 - confluentAvroSchemaPath:用于解码主题中所有消息的单个 Avro 架构文件的 Google Cloud Storage 路径。默认值为空。
- schemaRegistryConnectionUrl:用于管理 Avro 架构以进行消息解码的 Confluent 架构注册表实例的网址。默认值为空。
- binaryAvroSchemaPath:用于解码二进制编码 Avro 消息的 Avro 架构文件的 Google Cloud Storage 路径。默认值为空。
- schemaRegistryAuthenticationMode:架构注册表身份验证模式。可以是 NONE、TLS 或 OAUTH。默认为:NONE。
- schemaRegistryTruststoreLocation:SSL 证书的位置,用于存储用于对 Schema Registry 进行身份验证的信任库。例如
/your-bucket/truststore.jks
。 - schemaRegistryTruststorePasswordSecretId:Secret Manager 中的 SecretId,用于存储访问信任库中 Secret 的密码。例如
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryKeystoreLocation:包含 SSL 证书和私钥的密钥库位置。例如
/your-bucket/keystore.jks
。 - schemaRegistryKeystorePasswordSecretId:Secret Manager 中的 SecretId,其中包含用于访问密钥库文件的密码,例如
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryKeyPasswordSecretId:访问密钥库中存储的客户端私钥所需的密码的 SecretId,例如
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryOauthClientId:用于在 OAUTH 模式下对架构注册表客户端进行身份验证的客户端 ID。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此字段为必需字段。
- schemaRegistryOauthClientSecretId:Google Cloud Secret Manager Secret ID,其中包含用于在 OAUTH 模式下对 Schema Registry 客户端进行身份验证的客户端 Secret。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此字段为必需字段。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - schemaRegistryOauthScope:用于在 OAUTH 模式下对架构注册表客户端进行身份验证的访问令牌范围。此字段是可选字段,因为您无需传递范围参数即可发出请求。例如
openid
。 - schemaRegistryOauthTokenEndpointUrl:OAuth/OIDC 身份提供程序的基于 HTTP(S) 的网址,用于在 OAUTH 模式下对架构注册表客户端进行身份验证。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此字段为必需字段。
- outputDeadletterTable:失败消息的完全限定 BigQuery 表名称。出于各种原因(例如,架构不匹配、JSON 格式错误)未能到达输出表的消息会写入该表。该表将由模板创建。例如
your-project-id:your-dataset.your-table-name
。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Kafka to Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
BIGQUERY_TABLE
:您的 Cloud Storage 表名称KAFKA_TOPICS
:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅gcloud topic escaping
。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。KAFKA_SERVER_ADDRESSES
:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092
。如果提供了多个地址,您需要转义英文逗号。请参阅gcloud topic escaping
。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
BIGQUERY_TABLE
:您的 Cloud Storage 表名称KAFKA_TOPICS
:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅gcloud topic escaping
。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。KAFKA_SERVER_ADDRESSES
:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092
。如果提供了多个地址,您需要转义英文逗号。请参阅gcloud topic escaping
。
如需了解详情,请参阅使用 Dataflow 将数据从 Kafka 写入 Cloud Storage。
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。