Pub/Sub to Datadog 模板

Pub/Sub to Datadog 模板是一种流处理流水线,用于从 Pub/Sub 订阅中读取消息,并使用 Datadog 端点将消息载荷写入 Datadog。此模板的最常见应用场景是将日志文件导出到 Datadog。

在写入 Datadog 之前,您可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的消息都会被转发到 Pub/Sub 未处理主题,以便进一步进行问题排查并重新处理。

作为 API 密钥和密文的额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 API 密钥参数。如需详细了解如何对 API 密钥参数进行加密,请参阅 Cloud KMS API 加密端点

流水线要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Datadog 网址必须可从 Dataflow 工作器的网络访问。
  • Datadog API 密钥必须已生成并且可用。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅。例如 projects/<project-id>/subscriptions/<subscription-name>
url Datadog Logs API 网址。此网址必须可从运行流水线的 VPC 路由。例如 https://http-intake.logs.datadoghq.com。如需了解详情,请参阅 Datadog 文档中的发送日志
apiKeySource API 密钥的来源。支持以下值:PLAINTEXTKMSSECRET_MANAGER。如果您使用的是 Secret Manager,则必须提供此参数。如果 apiKeySource 设置为 KMS,您还必须提供 apiKeyKMSEncryptionKey 和加密 API Key。如果 apiKeySource 设置为 SECRET_MANAGER,则您还必须提供 apiKeySecretId。 如果 apiKeySource 设置为 PLAINTEXT,则您还必须提供 apiKey
apiKeyKMSEncryptionKey 可选:用于解密 API 密钥的 Cloud KMS 密钥。如果 apiKeySource 设置为 KMS,您必须提供此参数。如果提供了 Cloud KMS 密钥,则必须传入加密的 API 密钥。
apiKey 可选:Datadog API 密钥。如果 apiKeySource 设置为 PLAINTEXTKMS,您必须提供此值。如需了解详情,请参阅 Datadog 文档中的 API 和应用密钥
apiKeySecretId 可选:API 密钥的 Secret Manager Secret ID。如果 apiKeySource 设置为 SECRET_MANAGER,您必须提供此参数。请使用格式 projects/<project-id>/secrets/<secret-name>/versions/<secret-version>
outputDeadletterTopic 用于转发无法递送的消息的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath 可选:.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName 可选: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
javascriptTextTransformReloadIntervalMinutes 可选:指定重新加载 UDF 的频率(以分钟为单位)。如果值大于 0,则 Dataflow 会定期检查 Cloud Storage 中的 UDF 文件,并在文件遭到修改时重新加载 UDF。此参数可让您在流水线运行时更新 UDF,而无需重启作业。如果值为 0,则停用 UDF 重新加载。默认值为 0。
batchCount 可选:向 Datadog 发送多个事件的批次大小。默认值为 1(无批处理)。
parallelism 可选:最大并行请求数。默认值为 1(无并行)。
includePubsubMessage 可选:在载荷中包含完整的 Pub/Sub 消息。默认值为 false(载荷中仅包含数据元素)。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:Pub/Sub 消息数据字段,序列化为 JSON 字符串。
  • 输出:要发送到 Datadog 日志端点的事件数据。输出必须是字符串或字符串化 JSON 对象。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Datadog template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Datadog \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
apiKey=API_KEY,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM

请替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • API_KEY:Datadog 的 API 密钥
  • URL:Datadog 的端点网址(例如 https://http-intake.logs.datadoghq.com
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Datadog 发送多个事件的批次大小
  • PARALLELISM:用于向 Datadog 发送事件的并行请求数

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Datadog
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "apiKey": "API_KEY",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM"
   }
}

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • API_KEY:Datadog 的 API 密钥
  • URL:Datadog 的端点网址(例如 https://http-intake.logs.datadoghq.com
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Datadog 发送多个事件的批次大小
  • PARALLELISM:用于向 Datadog 发送事件的并行请求数

后续步骤