工作流模板的参数化

如果您要使用不同的值多次运行工作流模板,则可以在模板中定义参数(参数化模板),以避免每次都要修改工作流。然后,您可以在每次运行模板时为参数传递不同的值。

可参数化的字段

以下 Dataproc 工作流模板字段可以参数化:

  • 标签
  • 文件 URI
  • 受管集群名称:Dataproc 将使用用户提供的名称作为名称前缀,并附加随机字符以创建唯一的集群名称。系统会在工作流结束时删除集群。
  • 作业属性
  • 作业参数
  • 脚本变量(在 HiveJob、SparkSqlJob 和 PigJob 中)
  • 主类(在 HadoopJob 和 SparkJob 中)
  • 地区(在 ClusterSelector 中)
  • 主实例或工作器实例组中的实例数 (numInstances)。

参数属性

工作流模板参数定义了以下必需属性和可选属性:

名称(必填)
一种目录式变量名称。在稍后为参数提供值时,此名称将用作键。
字段(必填)
将用此参数替换的字段列表(有关可参数化的字段列表,请参阅可参数化的字段)。每个字段都被指定为“字段路径”(请参阅字段路径语法,以了解用于指定字段路径的语法)。请注意,一个字段最多可出现在一个参数的字段路径列表中。
说明(可选)
参数的简要说明。
验证(可选)
用于验证参数值的规则,该值可以是以下各项之一:
  1. 一系列允许使用的值
  2. 值必须匹配的一系列正则表达式。

字段路径语法

字段路径的语法与 FieldMask 类似。例如,引用工作流模板集群选择器的地区字段的字段路径将被指定为 placement.clusterSelector.zone

字段路径可以使用以下语法来引用字段:

  • 受管集群名称:

    • placement.managedCluster.clusterName
  • 映射中的值可通过键进行引用,例如:

    • labels['key']
    • placement.clusterSelector.clusterLabels['key']
    • placement.managedCluster.labels['key']
    • jobs['step-id'].labels['key']
  • 作业列表中的作业可通过 step-id 进行引用。

    • jobs['step-id'].hadoopJob.mainJarFileUri
    • jobs['step-id'].hiveJob.queryFileUri
    • jobs['step-id'].pySparkJob.mainPythonFileUri
    • jobs['step-id'].hadoopJob.jarFileUris[0]
    • jobs['step-id'].hadoopJob.archiveUris[0]
    • jobs['step-id'].hadoopJob.fileUris[0]
    • jobs['step-id'].pySparkJob.pythonFileUris[0]

    • 重复字段中的项可以通过从零开始的索引进行引用,例如:

    • jobs['step-id'].sparkJob.args[0]

    • 其他示例:

    • jobs['step-id'].hadoopJob.args[0]

    • jobs['step-id'].hadoopJob.mainJarFileUri

    • jobs['step-id'].hadoopJob.properties['key']

    • jobs['step-id'].hiveJob.scriptVariables['key']

    • placement.clusterSelector.zone

不能对映射和重复字段进行整体参数化:目前,只能引用重复字段中的单个映射值和单个项。例如,以下字段路径无效:

placement.clusterSelector.clusterLabels
jobs['step-id'].sparkJob.args

将工作流模板参数化

您可以使用 Dataproc API 或 Google Cloud CLI 定义模板参数,从而对工作流模板进行参数化。

gcloud 命令

如需定义工作流模板参数,您可以创建或使用 Google Cloud CLI 导出并修改工作流模板 YAML 文件,然后使用 Google Cloud CLI 导入该文件来创建或更新模板。如需了解详情,请参阅使用 YAML 文件

示例 1:参数化受管集群模板示例

以下是具有四个已定义参数(LUSTER、NUM_ROWS、GEN_OUT 和 SoRT_OUT)的示例 Teragen-terasort 受管集群工作流模板 YAML 文件:列出了两个版本:一个是之前参数化,另一个是之后参数化。

之前

placement:
  managedCluster:
    clusterName: my-managed-cluster
    config:
      gceClusterConfig:
        zoneUri: us-central1-a
jobs:
- hadoopJob:
    args:
    - teragen
    - '10000'
    - hdfs:///gen/
    mainJarFileUri: file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
  stepId: teragen
- hadoopJob:
    args:
    - terasort
    - hdfs:///gen/
    - hdfs:///sort/
    mainJarFileUri: file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
  prerequisiteStepIds:
  - teragen
  stepId: terasort

之后

placement:
  managedCluster:
    clusterName: 'to-be-determined'
    config:
      gceClusterConfig:
        zoneUri: us-central1-a
jobs:
- hadoopJob:
    args:
    - teragen
    - '10000'
    - hdfs:///gen/
    mainJarFileUri: file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
  stepId: teragen
- hadoopJob:
    args:
    - terasort
    - hdfs:///gen/
    - hdfs:///sort/
    mainJarFileUri: file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
  prerequisiteStepIds:
  - teragen
  stepId: terasort
parameters:
- description: The managed cluster name prefix
  fields:
  - placement.managedCluster.clusterName
  name: CLUSTER
- description: The number of rows to generate
  fields:
  - jobs['teragen'].hadoopJob.args[1]
  name: NUM_ROWS
  validation:
    values:
      values:
      - '1000'
      - '10000'
      - '100000'
- description: Output directory for teragen
  fields:
  - jobs['teragen'].hadoopJob.args[2]
  - jobs['terasort'].hadoopJob.args[1]
  name: GEN_OUT
  validation:
    regex:
      regexes:
      - hdfs:///.*
- description: Output directory for terasort
  fields:
  - jobs['terasort'].hadoopJob.args[2]
  name: SORT_OUT
  validation:
    regex:
      regexes:
      - hdfs:///.*

示例 2:集群选择器工作流模板示例

以下是具有三个已定义参数(LUSTER、NUM_ROWS 和 OUTPUT_DIR)的参数化示例 teragen-terasort 集群选择器工作流模板 YAML 文件:

placement:
  clusterSelector:
    clusterLabels:
      goog-dataproc-cluster-name: 'to-be-determined'
jobs:
  - stepId: teragen
    hadoopJob:
      args:
      - 'teragen'
      - 'tbd number of rows'
      - 'tbd output directory'
parameters:
- name: CLUSTER
  fields:
  - placement.clusterSelector.clusterLabels['goog-dataproc-cluster-name']
- name: NUM_ROWS
  fields:
  - jobs['teragen'].hadoopJob.args[1]
- name: OUTPUT_DIR
  fields:
  - jobs['teragen'].hadoopJob.args[2]

在创建或编辑了使用模板参数定义工作流模板的 YAML 文件后,请使用以下 gcloud 命令导入 YAML 文件以创建或更新参数化模板。

gcloud dataproc workflow-templates import template-ID or template-name \
    --region=region \
    --source=template.yaml

您可以将 WorkflowTemplate id 或完全合格的模板资源 name(“projects/projectId/regions/region/workflowTemplates/template_id”)传递给该命令。具有相同模板名称的模板资源将会被覆盖(更新),版本号将递增。如果不存在名称相同的模板,则会创建这样的模板。

Rest API

您可以在 workflowTemplates.createworkflowTemplates.update API 请求中定义一个或多个 WorkflowTemplate.parameters

以下是一个 workflowTemplates.create 请求示例,用于创建具有四个已定义参数(CLUSTER、NUM_ROWS、GEN_OUT 和 SoRT_OUT)的 Teragen-terasort 工作流模板。

POST https://dataproc.googleapis.com/v1/projects/my-project/locations/us-central1/workflowTemplates
{
  "id": "my-template",
  "jobs": [
    {
      "stepId": "teragen",
      "hadoopJob": {
        "mainJarFileUri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": [
          "teragen",
          "10000",
          "hdfs:///gen/"
        ]
      }
    },
    {
      "stepId": "terasort",
      "prerequisiteStepIds": [
        "teragen"
      ],
      "hadoopJob": {
        "mainJarFileUri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": [
          "terasort",
          "hdfs:///gen/",
          "hdfs:///sort/"
        ]
      }
    }
  ],
  "parameters": [
    {
      "name": "CLUSTER",
      "fields": [
        "placement.managedCluster.clusterName"
      ],
      "description": "The managed cluster name prefix"
    },
    {
      "name": "NUM_ROWS",
      "fields": [
        "jobs['teragen'].hadoopJob.args[1]"
      ],
      "description": "The number of rows to generate",
      "validation": {
        "values": {
          "values": [
            "1000",
            "10000",
            "100000"
          ]
        }
      }
    },
    {
      "name": "GEN_OUT",
      "fields": [
        "jobs['teragen'].hadoopJob.args[2]",
        "jobs['terasort'].hadoopJob.args[1]"
      ],
      "description": "Output directory for teragen",
      "validation": {
        "regex": {
          "regexes": [
            "hdfs:///.*"
          ]
        }
      }
    },
    {
      "name": "SORT_OUT",
      "fields": [
        "jobs['terasort'].hadoopJob.args[2]"
      ],
      "description": "Output directory for terasort",
      "validation": {
        "regex": {
          "regexes": [
            "hdfs:///.*"
          ]
        }
      }
    }
  ],
  "placement": {
    "managedCluster": {
      "clusterName": "to-be-determined",
      "config": {
        "gceClusterConfig": {
          "zoneUri": "us-central1-a"
        }
      }
    }
  }
}

将参数传递给参数化模板

您可以在每次运行参数化工作流模板时传递一组不同的参数值。您必须为模板中定义的每个参数提供一个值。

gcloud 命令

您可以将参数名称的映射传递给带有 --parameters 标志的 gcloud dataproc workflow-templates instantiate 命令。必须提供模板中定义的所有参数值。提供的值将替换模板中指定的值。

参数化受管集群模板示例

gcloud dataproc workflow-templates instantiate my-template \
    --region=region \
    --parameters=CLUSTER=cluster,NUM_ROWS=1000,GEN_OUT=hdfs:///gen_20180601/,SORT_OUT=hdfs:///sort_20180601

参数化的集群选择器模板示例

gcloud dataproc workflow-templates instantiate \
  --parameters CLUSTER=my-cluster,NUM_ROWS=10000,OUTPUT_DIR=hdfs://some/dir
    

Rest API

您可以将参数 namesparameters 映射传递给 values 到 Dataproc workflowTemplates.instantiate API。必须提供模板中定义的所有参数值。提供的值将替换模板中指定的值。

例如:

POST https://dataproc.googleapis.com/v1/projects/my-project/regions/us-central1/workflowTemplates/my-template:instantiate
{
  "parameters": {
    "CLUSTER": "clusterA",
    "NUM_ROWS": "1000",
    "GEN_OUT": "hdfs:///gen_20180601/",
    "SORT_OUT": "hdfs:///sort_20180601/"
  }
}