执行工作流

执行某个工作流会运行与该工作流关联的当前工作流定义。

您可以在工作流执行请求中传递运行时参数,并使用工作流变量访问这些参数。如需了解详情,请参阅 在执行请求中传递运行时参数

工作流执行完成后,其历史记录和结果将保留有限的一段时间。如需了解详情,请参阅配额和限制

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 如果工作流访问其他 Google Cloud 资源,请确保它与具有正确权限来执行此操作的服务账号相关联。如需了解与现有工作流关联的服务账号,请参阅验证工作流的关联服务账号

    请注意,要创建资源并关联服务账号,您需要 拥有创建该资源并模拟用来创建该服务账号的权限 您将附加到该资源如需了解详情,请参阅 服务账号权限

  7. 使用 Google Cloud 控制台Google Cloud CLI 部署工作流。

执行工作流

如需执行工作流,您可以使用 Google Cloud 控制台中的客户端库、gcloud CLI,或向 Workflows REST API 发送请求。

控制台

  1. 如需执行工作流,请在 Google Cloud 控制台中前往 工作流程页面:

    进入 Workflows

  2. Workflows 页面上,选择一个工作流以转到 详细信息页面。

  3. 工作流详情页面上,点击 执行

  4. 执行工作流页面的输入窗格中,输入可选的运行时参数,以传递给工作流执行。参数必须采用 JSON 格式;例如 {"animal":"cat"}。如果您的工作流不使用运行时参数,请将此字段留空。

  5. (可选)指定要应用于工作流执行的调用日志记录级别。在通话记录级别 列表中,请选择以下选项之一:

    • 未指定:未指定日志记录级别。这是默认设置。 除非未指定执行日志级别(默认),否则执行日志级别优先于任何工作流日志级别;在这种情况下,工作流日志级别适用。
    • 仅限错误:记录所有已捕获的异常;或者调用因异常而停止时。
    • 所有调用:记录对子工作流或库函数及其结果的所有调用。
    • 无日志:无调用日志记录。

  6. 点击执行

  7. 执行详情页面上,您可以查看执行结果,包括所有输出、执行 ID 和状态,以及工作流执行的当前步骤或最终步骤。如需更多信息 请参阅 访问工作流执行结果

gcloud

  1. 打开终端。

  2. 找到您要执行的工作流的名称。如果您不知道工作流的名称,则可以输入以下命令列出所有工作流:

    gcloud workflows list
  3. 您可以使用 gcloud workflows run 命令或 gcloud workflows execute 命令来执行工作流:

    • 执行工作流并等待执行完成:

      gcloud workflows run WORKFLOW_NAME \
          --call-log-level=CALL_LOGGING_LEVEL \
          --data=DATA
    • 执行工作流,而无需等待执行尝试完成:

      gcloud workflows execute WORKFLOW_NAME \
          --call-log-level=CALL_LOGGING_LEVEL \
          --data=DATA

      替换以下内容:

      • WORKFLOW_NAME:工作流的名称。
      • CALL_LOGGING_LEVEL(可选):在执行期间应用的调用日志记录级别。可以是以下之一:

        • none:未指定日志记录级别。这是默认设置。除非未指定执行日志级别(默认),否则执行日志级别优先于任何工作流日志级别;在这种情况下,工作流日志级别适用。
        • log-errors-only:记录所有已捕获的异常;或者调用因异常而停止时。
        • log-all-calls:记录对子工作流或库函数及其调用的所有调用。
        • log-none:无调用日志记录。
      • DATA(可选):您的工作流的运行时参数,其采用 JSON 格式。

  4. 如果运行 gcloud workflows execute,则返回工作流执行尝试的唯一 ID,输出类似于以下内容:

     To view the workflow status, you can use following command:
     gcloud workflows executions describe b113b589-8eff-4968-b830-8d35696f0b33 --workflow workflow-2 --location us-central1

    如需查看执行状态,请输入上一步返回的命令。

如果执行尝试成功,则输出内容类似如下,其中 state 表示工作流程成功,status 用于指定执行的最终工作流步骤。

argument: '{"searchTerm":"Friday"}'
endTime: '2022-06-22T12:17:53.086073678Z'
name: projects/1051295516635/locations/us-central1/workflows/myFirstWorkflow/executions/c4dffd1f-13db-46a0-8a4a-ee39c144cb96
result: '["Friday","Friday the 13th (franchise)","Friday Night Lights (TV series)","Friday
    the 13th (1980 film)","Friday the 13th","Friday the 13th (2009 film)","Friday the
    13th Part III","Friday the 13th Part 2","Friday (Rebecca Black song)","Friday Night
    Lights (film)"]'
startTime: '2022-06-22T12:17:52.799387653Z'
state: SUCCEEDED
status:
    currentSteps:
    - routine: main
        step: returnOutput
workflowRevisionId: 000001-ac2

客户端库

以下示例假定您已经部署了一个工作流, myFirstWorkflow

  1. 安装客户端库并设置开发环境。如需了解详情,请参阅 Workflows 客户端库概览

  2. 将示例应用代码库克隆到本地机器:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

  3. 切换到包含工作流示例代码的目录:

    Java

    cd java-docs-samples/workflows/cloud-client/

    Node.js

    cd nodejs-docs-samples/workflows/quickstart/

    Python

    cd python-docs-samples/workflows/cloud-client/

  4. 查看示例代码:

    Java

    // Imports the Google Cloud client library
    
    import com.google.cloud.workflows.executions.v1.CreateExecutionRequest;
    import com.google.cloud.workflows.executions.v1.Execution;
    import com.google.cloud.workflows.executions.v1.ExecutionsClient;
    import com.google.cloud.workflows.executions.v1.WorkflowName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class WorkflowsQuickstart {
    
      private static final String PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
      private static final String LOCATION = System.getenv().getOrDefault("LOCATION", "us-central1");
      private static final String WORKFLOW =
          System.getenv().getOrDefault("WORKFLOW", "myFirstWorkflow");
    
      public static void main(String... args)
          throws IOException, InterruptedException, ExecutionException {
        if (PROJECT == null) {
          throw new IllegalArgumentException(
              "Environment variable 'GOOGLE_CLOUD_PROJECT' is required to run this quickstart.");
        }
        workflowsQuickstart(PROJECT, LOCATION, WORKFLOW);
      }
    
      private static volatile boolean finished;
    
      public static void workflowsQuickstart(String projectId, String location, String workflow)
          throws IOException, InterruptedException, ExecutionException {
        // Initialize client that will be used to send requests. This client only needs
        // to be created once, and can be reused for multiple requests. After completing all of your
        // requests, call the "close" method on the client to safely clean up any remaining background
        // resources.
        try (ExecutionsClient executionsClient = ExecutionsClient.create()) {
          // Construct the fully qualified location path.
          WorkflowName parent = WorkflowName.of(projectId, location, workflow);
    
          // Creates the execution object.
          CreateExecutionRequest request =
              CreateExecutionRequest.newBuilder()
                  .setParent(parent.toString())
                  .setExecution(Execution.newBuilder().build())
                  .build();
          Execution response = executionsClient.createExecution(request);
    
          String executionName = response.getName();
          System.out.printf("Created execution: %s%n", executionName);
    
          long backoffTime = 0;
          long backoffDelay = 1_000; // Start wait with delay of 1,000 ms
          final long backoffTimeout = 10 * 60 * 1_000; // Time out at 10 minutes
          System.out.println("Poll for results...");
    
          // Wait for execution to finish, then print results.
          while (!finished && backoffTime < backoffTimeout) {
            Execution execution = executionsClient.getExecution(executionName);
            finished = execution.getState() != Execution.State.ACTIVE;
    
            // If we haven't seen the results yet, wait.
            if (!finished) {
              System.out.println("- Waiting for results");
              Thread.sleep(backoffDelay);
              backoffTime += backoffDelay;
              backoffDelay *= 2; // Double the delay to provide exponential backoff.
            } else {
              System.out.println("Execution finished with state: " + execution.getState().name());
              System.out.println("Execution results: " + execution.getResult());
            }
          }
        }
      }
    }

    Node.js (JavaScript)

    const {ExecutionsClient} = require('@google-cloud/workflows');
    const client = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(projectId, location, workflow, searchTerm) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms) {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(err => {
      console.error(err.message);
      process.exitCode = 1;
    });
    

    Node.js (TypeScript)

    import {ExecutionsClient} from '@google-cloud/workflows';
    const client: ExecutionsClient = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(
      projectId: string,
      location: string,
      workflow: string,
      searchTerm: string
    ) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms: number): Promise<unknown> {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(
      (err: Error) => {
        console.error(err.message);
        process.exitCode = 1;
      }
    );

    Python

    import time
    
    from google.cloud import workflows_v1
    from google.cloud.workflows import executions_v1
    from google.cloud.workflows.executions_v1 import Execution
    from google.cloud.workflows.executions_v1.types import executions
    
    
    def execute_workflow(
        project: str, location: str = "us-central1", workflow: str = "myFirstWorkflow"
    ) -> Execution:
        """Execute a workflow and print the execution results.
    
        A workflow consists of a series of steps described using the Workflows syntax, and can be written in either YAML or JSON.
    
        Args:
            project: The Google Cloud project id which contains the workflow to execute.
            location: The location for the workflow
            workflow: The ID of the workflow to execute.
    
        Returns:
            The execution response.
        """
        # Set up API clients.
        execution_client = executions_v1.ExecutionsClient()
        workflows_client = workflows_v1.WorkflowsClient()
        # Construct the fully qualified location path.
        parent = workflows_client.workflow_path(project, location, workflow)
    
        # Execute the workflow.
        response = execution_client.create_execution(request={"parent": parent})
        print(f"Created execution: {response.name}")
    
        # Wait for execution to finish, then print results.
        execution_finished = False
        backoff_delay = 1  # Start wait with delay of 1 second
        print("Poll for result...")
        while not execution_finished:
            execution = execution_client.get_execution(request={"name": response.name})
            execution_finished = execution.state != executions.Execution.State.ACTIVE
    
            # If we haven't seen the result yet, wait a second.
            if not execution_finished:
                print("- Waiting for results...")
                time.sleep(backoff_delay)
                # Double the delay to provide exponential backoff.
                backoff_delay *= 2
            else:
                print(f"Execution finished with state: {execution.state.name}")
                print(f"Execution results: {execution.result}")
                return execution
    
    

    该示例会执行以下操作:

    1. 为工作流设置 Cloud 客户端库。
    2. 执行工作流。
    3. 轮询工作流的执行情况(使用指数退避算法),直到 执行终止。
    4. 打印执行结果。
  5. 要运行示例,请先安装依赖项:

    Java

    mvn compile

    Node.js (JavaScript)

    npm install

    Node.js (TypeScript)

    npm install && npm run build

    Python

    pip3 install -r requirements.txt

  6. 运行脚本:

    Java

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME mvn compile exec:java -Dexec.mainClass=com.example.workflows.WorkflowsQuickstart

    Node.js (JavaScript)

    npm start PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Node.js (TypeScript)

    npm start PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Python

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME python3 main.py

    替换以下内容:

    • PROJECT_ID(必需): Google Cloud 项目
    • CLOUD_REGION:工作流的位置(默认值:us-central1
    • WORKFLOW_NAME:工作流的 ID (默认值:myFirstWorkflow

    输出类似于以下内容:

    Execution finished with state: SUCCEEDED
    ["Sunday","Sunday in the Park with George","Sunday shopping","Sunday Bloody Sunday","Sunday Times Golden Globe Race","Sunday All Stars","Sunday Night (South Korean TV series)","Sunday Silence","Sunday Without God","Sunday Independent (Ireland)"]
    

REST API

如需使用给定工作流的最新修订版本创建新执行,请使用 该 projects.locations.workflows.executions.create 方法。

请注意,如需进行身份验证,您需要一个具有足够权限来执行工作流的服务账号。例如,您可以向服务账号授予 Workflows Invoker 角色 (roles/workflows.invoker),以便该账号有权触发您的工作流执行。如需了解详情,请参阅调用工作流

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_NUMBERIAM 和管理设置页面中列出的 Google Cloud 项目编号。
  • LOCATION:在其中部署此工作流的区域,例如 us-central1
  • WORKFLOW_NAME:用户定义的名称 例如 myFirstWorkflow
  • PARAMETER:可选。如果工作流 可以接收运行时参数,将其作为执行请求的一部分传递; 您可以向请求正文添加 JSON 格式的字符串,该字符串的值是一次或多次转义 “参数-值”对,例如 "{\"searchTerm\":\"asia\"}"
  • VALUE:可选。 参数值对。
  • CALL_LOGGING_LEVEL:可选。 执行期间要应用的调用日志记录级别。默认情况下,系统不会指定日志记录级别,而是会应用工作流日志级别。如需了解详情,请参阅将日志发送到 Logging。其中一个 以下:
    • CALL_LOG_LEVEL_UNSPECIFIED:未指定日志记录级别,且 工作流日志级别。这是默认设置。否则, 且优先级高于工作流日志级别。
    • LOG_ERRORS_ONLY:记录所有捕获的异常;或是在拨打 已因异常而停止。
    • LOG_ALL_CALLS:记录对子工作流或库函数的所有调用 及其结果。
    • LOG_NONE:无调用日志记录。
  • BACKLOG_EXECUTION:可选。如果设置为 true,则在并发配额耗尽时,执行作业不会积压。如需了解详情,请参阅 管理执行积压工作

请求 JSON 正文:

{
  "argument": "{\"PARAMETER\":\"VALUE\"}",
  "callLogLevel": "CALL_LOGGING_LEVEL",
  "disableConcurrencyQuotaOverflowBuffering": "BACKLOG_EXECUTION"
}

如需发送您的请求,请展开以下选项之一:

如果成功,响应正文将包含一个新创建的 Execution 实例:

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION/workflows/WORKFLOW_NAME/executions/EXECUTION_ID",
  "startTime": "2023-11-07T14:35:27.215337069Z",
  "state": "ACTIVE",
  "argument": "{\"PARAMETER\":\"VALUE\"}",
  "workflowRevisionId": "000001-2df",
  "callLogLevel": "CALL_LOGGING_LEVEL",
  "status": {}
}

查看执行操作的状态

您可以使用多个命令来帮助检查工作流执行的状态。

  • 如需检索工作流的执行尝试及其 ID 列表,请输入以下命令:

    gcloud workflows executions list WORKFLOW_NAME

    WORKFLOW_NAME 替换为工作流的名称。

    该命令会返回类似于以下内容的 NAME 值:

    projects/PROJECT_NUMBER/locations/REGION/workflows/WORKFLOW_NAME/executions/EXECUTION_ID

    复制执行 ID 以在下一个命令中使用。

  • 如需检查执行尝试的状态并等待尝试完成,请输入以下命令:

    gcloud workflows executions wait EXECUTION_ID

    EXECUTION_ID 替换为执行尝试的 ID。

    该命令会等到执行尝试完成后,再返回结果。

  • 要等待上次执行完成,然后返回 完成执行,请输入以下命令:

    gcloud workflows executions wait-last

    如果您在同一个 gcloud 会话中进行了先前的执行尝试,该命令将等到前一个执行尝试完成后,再返回已完成的执行的结果。如果前一个尝试不存在,gcloud 将返回以下错误:

    ERROR: (gcloud.workflows.executions.wait-last) [NOT FOUND] There are no cached executions available.
    
  • 要获取上次执行的状态,请输入 以下命令:

    gcloud workflows executions describe-last

    如果您先前在同一 gcloud 会话中尝试过执行,则 命令返回上次执行的结果(即使该命令正在运行)。如果前一个尝试不存在,gcloud 将返回以下错误:

    ERROR: (gcloud.beta.workflows.executions.describe-last) [NOT FOUND] There are no cached executions available.
    

过滤执行

您可以将过滤器应用于 workflows.executions.list 方法

您可以按以下字段进行过滤:

  • createTime
  • disableOverflowBuffering
  • duration
  • endTime
  • executionId
  • label
  • startTime
  • state
  • stepName
  • workflowRevisionId

例如,如需按标签 (labels."fruit":"apple") 进行过滤,您可以发出类似于以下内容的 API 请求:

GET https://workflowexecutions.googleapis.com/v1/projects/MY_PROJECT/locations/MY_LOCATION/workflows/MY_WORKFLOW/executions?view=full&filter=labels.%22fruit%22%3A%22apple%22"

其中:

  • view=full 指定一个视图,用于定义应在返回的执行中填充哪些字段;在本例中,所有数据
  • labels.%22fruit%22%3A%22apple%22 是网址编码的过滤条件语法

如需了解详情,请参阅 AIP-160 过滤

管理执行回传

您可以使用执行回传来避免客户端重试、消除执行延迟并最大限度地提高吞吐量。积压的执行作业会在 。

活跃工作流的数量有上限 可同时运行的多个作业。当此配额用尽且执行回推已停用,或者达到回推执行的配额时,所有新执行都会失败并返回 HTTP 429 Too many requests 状态代码。启用执行积压后,新的执行会成功, 处于 QUEUED 状态。执行并发配额可用后,执行作业会自动运行并进入 ACTIVE 状态。

默认情况下,系统会为所有请求(包括那些 由 Cloud Tasks 触发)的情况除外:

  • 使用 executions.runexecutions.create 连接器,则执行积压默认处于停用状态。您可以 通过明确设置执行的 将 disableConcurrencyQuotaOverflowBuffering 字段更改为 false
  • 对于由 Pub/Sub 触发的执行,执行积压 已停用且无法配置。

请注意以下几点:

  • 已排队的执行在 我们将尽最大努力处理
  • createTime 时间戳字段指示执行作业的创建时间。通过 startTime 时间戳指示执行自动从 积压队列并开始运行。对于未积压的执行作业,这两个时间戳值相同。
  • 您可以使用 workflowexecutions.googleapis.com/executionbacklogentries 配额指标。对于 如需了解详情,请参阅查看和管理配额

停用执行积压

在使用 Google Cloud CLI例如:

gcloud workflows execute WORKFLOW_NAME
    --disable-concurrency-quota-overflow-buffering

或者,您也可以通过设置 请求 JSON 中的 disableConcurrencyQuotaOverflowBuffering 字段设置为 true 正文。 例如:

{
  "argument": {"arg1":"value1"},
  "callLogLevel": "LOG_NONE",
  "disableConcurrencyQuotaOverflowBuffering": true
}

如需了解详情,请参阅执行工作流

后续步骤