Apache Kafka to Apache Kafka 模板会创建一个流处理流水线,可从 Apache Kafka 来源以字节形式注入数据,然后将字节写入 Apache Kafka 接收器。
流水线要求
- Apache Kafka 来源主题必须已存在。
- Apache Kafka 来源和接收器代理服务器必须已在运行且可从 Dataflow 工作器机器进行访问。
- 如果您将 Google Cloud Managed Service for Apache Kafka 用作来源或接收器,则该主题必须已存在才能启动模板。
Kafka 消息格式
Apache Kafka 来源消息会以字节形式读取,然后字节会写入 Apache Kafka 接收器。
身份验证
Apache Kafka to Apache Kafka 模板支持对 Kafka 代理进行 SASL/PLAIN 和 TLS 身份验证。
模板参数
必需参数
- readBootstrapServerAndTopic:要从中读取输入的 Kafka 引导服务器和主题。例如
localhost:9092;topic1,topic2
。 - kafkaReadAuthenticationMode:与 Kafka 集群搭配使用的身份验证模式。如果不进行身份验证,请使用
NONE
;如果使用 SASL/PLAIN 用户名和密码,请使用SASL_PLAIN
;如果使用基于证书的身份验证,请使用TLS
。Apache Kafka for BigQuery 仅支持SASL_PLAIN
身份验证模式。默认值为:SASL_PLAIN。 - writeBootstrapServerAndTopic:要将输出写入的 Kafka 主题。
- kafkaWriteAuthenticationMethod:与 Kafka 集群搭配使用的身份验证模式。如果不进行身份验证,请使用 NONE;如果使用 SASL/PLAIN 用户名和密码,请使用 SASL_PLAIN;如果使用基于证书的身份验证,请使用 TLS。默认值为:APPLICATION_DEFAULT_CREDENTIALS。
可选参数
- enableCommitOffsets:将已处理消息的偏移量提交到 Kafka。如果启用此参数,则在重启流水线时,消息处理的间隔或重复处理会降到最低。需要指定使用方群组 ID。默认值为:false。
- consumerGroupId:此流水线所属的使用方群组的唯一标识符。如果已启用“将偏移量提交到 Kafka”,则必须使用此参数。默认值为空。
- kafkaReadOffset:在没有提交偏移量的情况下读取消息的起点。最早的从最开始算起,最新的从最新消息算起。默认值为:latest。
- kafkaReadUsernameSecretId:Google Cloud Secret Manager Secret ID,其中包含要与
SASL_PLAIN
身份验证搭配使用的 Kafka 用户名。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。默认值为空。 - kafkaReadPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含要与
SASL_PLAIN
身份验证搭配使用的 Kafka 密码。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。默认值为空。 - kafkaReadKeystoreLocation:Java KeyStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含在向 Kafka 集群进行身份验证时使用的 TLS 证书和私钥。例如
gs://your-bucket/keystore.jks
。 - kafkaReadTruststoreLocation:Java TrustStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于验证 Kafka 代理身份的受信任证书。
- kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java TrustStore (JKS) 文件以进行 Kafka TLS 身份验证的密码,例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件以进行 Kafka TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件中的私钥以进行 Kafka TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaWriteUsernameSecretId:Google Cloud Secret Manager Secret ID,其中包含用于向目标 Kafka 集群进行 SASL_PLAIN 身份验证的 Kafka 用户名。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。默认值为空。 - kafkaWritePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于向目标 Kafka 集群进行 SASL_PLAIN 身份验证的 Kafka 密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。默认值为空。 - kafkaWriteKeystoreLocation:Java KeyStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于向目标 Kafka 集群进行身份验证的 TLS 证书和私钥。例如
gs://<BUCKET>/<KEYSTORE>.jks
。 - kafkaWriteTruststoreLocation:Java TrustStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于验证目标 Kafka 代理身份的受信任证书。
- kafkaWriteTruststorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java TrustStore (JKS) 文件以向目标 Kafka 集群进行 TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaWriteKeystorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件以向目标 Kafka 集群进行 TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaWriteKeyPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件中的私钥以向目标 Kafka 集群进行 TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Kafka to Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
BIGQUERY_TABLE
:您的 Cloud Storage 表名称KAFKA_TOPICS
:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅gcloud topic escaping
。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。KAFKA_SERVER_ADDRESSES
:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092
。如果提供了多个地址,您需要转义英文逗号。请参阅gcloud topic escaping
。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
BIGQUERY_TABLE
:您的 Cloud Storage 表名称KAFKA_TOPICS
:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅gcloud topic escaping
。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。KAFKA_SERVER_ADDRESSES
:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092
。如果提供了多个地址,您需要转义英文逗号。请参阅gcloud topic escaping
。
如需了解详情,请参阅使用 Dataflow 将数据从 Kafka 写入 Cloud Storage。
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。