Pub/Sub to Datadog 模板

Pub/Sub to Datadog 模板是一种流处理流水线,用于从 Pub/Sub 订阅中读取消息,并使用 Datadog 端点将消息载荷写入 Datadog。此模板的最常见应用场景是将日志文件导出到 Datadog。

在写入 Datadog 之前,您可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的消息都会被转发到 Pub/Sub 未处理主题,以便进一步进行问题排查并重新处理。

作为 API 密钥和密文的额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 API 密钥参数。如需详细了解如何对 API 密钥参数进行加密,请参阅 Cloud KMS API 加密端点

流水线要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Datadog 网址必须可从 Dataflow 工作器的网络访问。
  • Datadog API 密钥必须已生成并且可用。

模板参数

必需参数

  • inputSubscription:要从中读取输入的 Pub/Sub 订阅。(示例:projects/your-project-id/subscriptions/your-subscription-name)。
  • url:Datadog Logs API 网址。此网址必须可从运行流水线的 VPC 路由。如需了解详情,请参阅 Datadog 文档中的“发送日志”部分 (https://docs.datadoghq.com/api/latest/logs/#send-logs)。(示例:https://http-intake.logs.datadoghq.com)。
  • outputDeadletterTopic:用于转发无法递送的消息的 Pub/Sub 主题。例如,projects/<PROJECT_ID>/topics/<TOPIC_NAME>。

可选参数

  • apiKey:Datadog API 密钥。如果 apiKeySource 设置为 PLAINTEXTKMS,您必须提供此值。如需了解详情,请参阅 Datadog 文档中的“API 和应用密钥”部分 (https://docs.datadoghq.com/account_management/api-app-keys/)。
  • batchCount:向 Datadog 发送多个事件的批次大小。默认值为 1(无批处理)。
  • parallelism:最大并行请求数。默认值为 1(无并行)。
  • includePubsubMessage:是否在载荷中包含完整的 Pub/Sub 消息。默认值为 true(载荷中包含所有元素,包括数据元素)。
  • apiKeyKMSEncryptionKey:用于解密 API 密钥的 Cloud KMS 密钥。如果 apiKeySource 设置为 KMS,您必须提供此参数。如果提供了 Cloud KMS 密钥,则必须传入加密的 API 密钥。 (示例:projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name)。
  • apiKeySecretId:API 密钥的 Secret Manager Secret ID。如果 apiKeySource 设置为 SECRET_MANAGER,您必须提供此参数。(示例:projects/your-project-id/secrets/your-secret/versions/your-secret-version)。
  • apiKeySource:API 密钥的来源。支持以下值:PLAINTEXTKMSSECRET_MANAGER。如果您使用的是 Secret Manager,则必须提供此参数。 如果 apiKeySource 设置为 KMS,您还必须提供 apiKeyKMSEncryptionKey 和加密 API Key。如果 apiKeySource 设置为 SECRET_MANAGER,则您还必须提供 apiKeySecretId。如果 apiKeySource 设置为 PLAINTEXT,则您还必须提供 apiKey
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:定义工作器检查 JavaScript UDF 更改以重新加载文件的时间间隔。默认值为 0。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:Pub/Sub 消息数据字段,序列化为 JSON 字符串。
  • 输出:要发送到 Datadog 日志端点的事件数据。输出必须是字符串或字符串化 JSON 对象。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Datadog template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Datadog \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
apiKey=API_KEY,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • API_KEY:Datadog 的 API 密钥
  • URL:Datadog 的端点网址(例如 https://http-intake.logs.datadoghq.com
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Datadog 发送多个事件的批次大小
  • PARALLELISM:用于向 Datadog 发送事件的并行请求数

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Datadog
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "apiKey": "API_KEY",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • API_KEY:Datadog 的 API 密钥
  • URL:Datadog 的端点网址(例如 https://http-intake.logs.datadoghq.com
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Datadog 发送多个事件的批次大小
  • PARALLELISM:用于向 Datadog 发送事件的并行请求数

后续步骤