Apache Kafka to Kafka 模板

Apache Kafka to Apache Kafka 模板会创建一个流处理流水线,可从 Apache Kafka 来源以字节形式注入数据,然后将字节写入 Apache Kafka 接收器。

流水线要求

  • Apache Kafka 来源主题必须已存在。
  • Apache Kafka 来源和接收器代理服务器必须已在运行且可从 Dataflow 工作器机器进行访问。
  • 如果您将 Google Cloud Managed Service for Apache Kafka 用作来源或接收器,则该主题必须已存在才能启动模板。

Kafka 消息格式

Apache Kafka 来源消息会以字节形式读取,然后字节会写入 Apache Kafka 接收器。

身份验证

Apache Kafka to Apache Kafka 模板支持对 Kafka 代理进行 SASL/PLAINTLS 身份验证。

模板参数

必需参数

  • readBootstrapServerAndTopic:要从中读取输入的 Kafka 主题。
  • kafkaReadAuthenticationMode:与 Kafka 集群搭配使用的身份验证模式。如果不进行身份验证,请使用 NONE;如果使用 SASL/PLAIN 用户名和密码,请使用 SASL_PLAIN;如果使用基于证书的身份验证,请使用 TLS。Apache Kafka for BigQuery 仅支持 SASL_PLAIN 身份验证模式。默认值为:SASL_PLAIN。
  • writeBootstrapServerAndTopic:要将输出写入的 Kafka 主题。
  • kafkaWriteAuthenticationMethod:与 Kafka 集群搭配使用的身份验证模式。如果不进行身份验证,请使用 NONE;如果使用 SASL/PLAIN 用户名和密码,请使用 SASL_PLAIN;如果使用基于证书的身份验证,请使用 TLS。默认为:NONE。

可选参数

  • enableCommitOffsets:将已处理消息的偏移量提交到 Kafka。如果启用此参数,则在重启流水线时,消息处理的间隔或重复处理会降到最低。需要指定使用方群组 ID。默认值为:false。
  • consumerGroupId:此流水线所属的使用方群组的唯一标识符。如果已启用“将偏移量提交到 Kafka”,则必须使用此参数。默认值为空。
  • kafkaReadOffset:在没有提交偏移量的情况下读取消息的起点。最早的从最开始算起,最新的从最新消息算起。默认值为:latest。
  • kafkaReadUsernameSecretId:Google Cloud Secret Manager Secret ID,其中包含要与 SASL_PLAIN 身份验证搭配使用的 Kafka 用户名。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。默认值为空。
  • kafkaReadPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含要与 SASL_PLAIN 身份验证搭配使用的 Kafka 密码。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。默认值为空。
  • kafkaReadKeystoreLocation:Java KeyStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含在向 Kafka 集群进行身份验证时使用的 TLS 证书和私钥。(示例:gs://your-bucket/keystore.jks)。
  • kafkaReadTruststoreLocation:Java TrustStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于验证 Kafka 代理身份的受信任证书。
  • kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java TrustStore (JKS) 文件以进行 Kafka TLS 身份验证的密码(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。
  • kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件以进行 Kafka TLS 身份验证的密码。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。
  • kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件中的私钥以进行 Kafka TLS 身份验证的密码。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。
  • kafkaWriteUsernameSecretId:Google Cloud Secret Manager Secret ID,其中包含用于向目标 Kafka 集群进行 SASL_PLAIN 身份验证 Kafka 用户名。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。默认值为空。
  • kafkaWritePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于向目标 Kafka 集群进行 SASL_PLAIN 身份验证的 Kafka 密码。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。默认值为空。
  • kafkaWriteKeystoreLocation:Java KeyStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于向目标 Kafka 集群进行身份验证的 TLS 证书和私钥。(示例:gs://)。
  • kafkaWriteTruststoreLocation:Java TrustStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于验证目标 Kafka 代理身份的受信任证书。
  • kafkaWriteTruststorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java TrustStore (JKS) 文件以向目标 Kafka 集群进行 TLS 身份验证的密码。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。
  • kafkaWriteKeystorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件以向目标 Kafka 集群进行 TLS 身份验证的密码。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。
  • kafkaWriteKeyPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件中的私钥以向目标 Kafka 集群进行 TLS 身份验证的密码。(示例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Kafka to Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
  8. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • BIGQUERY_TABLE:您的 Cloud Storage 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅 gcloud topic escaping
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092。如果提供了多个地址,您需要转义英文逗号。请参阅 gcloud topic escaping

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • BIGQUERY_TABLE:您的 Cloud Storage 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅 gcloud topic escaping
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092。如果提供了多个地址,您需要转义英文逗号。请参阅 gcloud topic escaping

如需了解详情,请参阅使用 Dataflow 将数据从 Kafka 写入 Cloud Storage

后续步骤