执行工作流

执行某个工作流会运行与该工作流关联的当前工作流定义。

您可以在工作流执行请求中传递运行时参数,并使用工作流变量访问这些参数。如需了解详情,请参阅在执行请求中传递运行时参数

工作流执行完成后,其历史记录和结果将保留有限的一段时间。如需了解详情,请参阅配额和限制

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 如果工作流访问其他 Google Cloud 资源,请确保该工作流与具有正确权限的服务帐号相关联。如需了解与现有工作流关联的服务帐号,请参阅验证工作流的关联服务帐号

    请注意,如需创建资源并关联服务帐号,您需要拥有创建该资源以及模拟您将附加到该资源的服务帐号的权限。如需了解详情,请参阅服务帐号权限

  7. 使用 Google Cloud 控制台Google Cloud CLI 部署工作流。

执行工作流

您可以使用客户端库、在 Google Cloud 控制台中、使用 gcloud CLI 或通过向 Workflows REST API 发送请求来执行工作流。

客户端库

以下示例假定您已部署工作流 myFirstWorkflow

  1. 安装客户端库并设置开发环境。如需了解详情,请参阅 Workflows 客户端库概览

  2. 将示例应用代码库克隆到本地机器:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

  3. 切换到包含工作流示例代码的目录:

    Java

    cd java-docs-samples/workflows/cloud-client/

    Node.js

    cd nodejs-docs-samples/workflows/quickstart/

    Python

    cd python-docs-samples/workflows/cloud-client/

  4. 查看示例代码:

    Java

    // Imports the Google Cloud client library
    
    import com.google.cloud.workflows.executions.v1.CreateExecutionRequest;
    import com.google.cloud.workflows.executions.v1.Execution;
    import com.google.cloud.workflows.executions.v1.ExecutionsClient;
    import com.google.cloud.workflows.executions.v1.WorkflowName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class WorkflowsQuickstart {
    
      private static final String PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
      private static final String LOCATION = System.getenv().getOrDefault("LOCATION", "us-central1");
      private static final String WORKFLOW =
          System.getenv().getOrDefault("WORKFLOW", "myFirstWorkflow");
    
      public static void main(String... args)
          throws IOException, InterruptedException, ExecutionException {
        if (PROJECT == null) {
          throw new IllegalArgumentException(
              "Environment variable 'GOOGLE_CLOUD_PROJECT' is required to run this quickstart.");
        }
        workflowsQuickstart(PROJECT, LOCATION, WORKFLOW);
      }
    
      private static volatile boolean finished;
    
      public static void workflowsQuickstart(String projectId, String location, String workflow)
          throws IOException, InterruptedException, ExecutionException {
        // Initialize client that will be used to send requests. This client only needs
        // to be created once, and can be reused for multiple requests. After completing all of your
        // requests, call the "close" method on the client to safely clean up any remaining background
        // resources.
        try (ExecutionsClient executionsClient = ExecutionsClient.create()) {
          // Construct the fully qualified location path.
          WorkflowName parent = WorkflowName.of(projectId, location, workflow);
    
          // Creates the execution object.
          CreateExecutionRequest request =
              CreateExecutionRequest.newBuilder()
                  .setParent(parent.toString())
                  .setExecution(Execution.newBuilder().build())
                  .build();
          Execution response = executionsClient.createExecution(request);
    
          String executionName = response.getName();
          System.out.printf("Created execution: %s%n", executionName);
    
          long backoffTime = 0;
          long backoffDelay = 1_000; // Start wait with delay of 1,000 ms
          final long backoffTimeout = 10 * 60 * 1_000; // Time out at 10 minutes
          System.out.println("Poll for results...");
    
          // Wait for execution to finish, then print results.
          while (!finished && backoffTime < backoffTimeout) {
            Execution execution = executionsClient.getExecution(executionName);
            finished = execution.getState() != Execution.State.ACTIVE;
    
            // If we haven't seen the results yet, wait.
            if (!finished) {
              System.out.println("- Waiting for results");
              Thread.sleep(backoffDelay);
              backoffTime += backoffDelay;
              backoffDelay *= 2; // Double the delay to provide exponential backoff.
            } else {
              System.out.println("Execution finished with state: " + execution.getState().name());
              System.out.println("Execution results: " + execution.getResult());
            }
          }
        }
      }
    }

    Node.js (JavaScript)

    const {ExecutionsClient} = require('@google-cloud/workflows');
    const client = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(projectId, location, workflow, searchTerm) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms) {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(err => {
      console.error(err.message);
      process.exitCode = 1;
    });
    

    Node.js (TypeScript)

    import {ExecutionsClient} from '@google-cloud/workflows';
    const client: ExecutionsClient = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(
      projectId: string,
      location: string,
      workflow: string,
      searchTerm: string
    ) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms: number): Promise<unknown> {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(
      (err: Error) => {
        console.error(err.message);
        process.exitCode = 1;
      }
    );

    Python

    import time
    
    from google.cloud import workflows_v1
    from google.cloud.workflows import executions_v1
    from google.cloud.workflows.executions_v1 import Execution
    from google.cloud.workflows.executions_v1.types import executions
    
    def execute_workflow(
        project: str, location: str = "us-central1", workflow: str = "myFirstWorkflow"
    ) -> Execution:
        """Execute a workflow and print the execution results.
    
        A workflow consists of a series of steps described using the Workflows syntax, and can be written in either YAML or JSON.
    
        Args:
            project: The Google Cloud project id which contains the workflow to execute.
            location: The location for the workflow
            workflow: The ID of the workflow to execute.
    
        Returns:
            The execution response.
        """
        # Set up API clients.
        execution_client = executions_v1.ExecutionsClient()
        workflows_client = workflows_v1.WorkflowsClient()
        # Construct the fully qualified location path.
        parent = workflows_client.workflow_path(project, location, workflow)
    
        # Execute the workflow.
        response = execution_client.create_execution(request={"parent": parent})
        print(f"Created execution: {response.name}")
    
        # Wait for execution to finish, then print results.
        execution_finished = False
        backoff_delay = 1  # Start wait with delay of 1 second
        print("Poll for result...")
        while not execution_finished:
            execution = execution_client.get_execution(request={"name": response.name})
            execution_finished = execution.state != executions.Execution.State.ACTIVE
    
            # If we haven't seen the result yet, wait a second.
            if not execution_finished:
                print("- Waiting for results...")
                time.sleep(backoff_delay)
                # Double the delay to provide exponential backoff.
                backoff_delay *= 2
            else:
                print(f"Execution finished with state: {execution.state.name}")
                print(f"Execution results: {execution.result}")
                return execution
    
    

    该示例将执行以下操作:

    1. 为工作流设置 Cloud 客户端库。
    2. 执行工作流。
    3. 轮询工作流的执行情况(使用指数退避算法),直到执行终止。
    4. 打印执行结果。
  5. 要运行示例,请先安装依赖项:

    Java

    mvn compile

    Node.js (JavaScript)

    npm install

    Node.js (TypeScript)

    npm install && npm run build

    Python

    pip3 install -r requirements.txt

  6. 运行脚本:

    Java

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME mvn compile exec:java -Dexec.mainClass=com.example.workflows.WorkflowsQuickstart

    Node.js (JavaScript)

    npm start PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Node.js (TypeScript)

    npm start PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Python

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME python3 main.py

    替换以下内容:

    • PROJECT_ID(必需):Google Cloud 项目的 ID
    • CLOUD_REGION:工作流的位置(默认值:us-central1
    • WORKFLOW_NAME:工作流的 ID(默认值:myFirstWorkflow

    输出类似于以下内容:

    Execution finished with state: SUCCEEDED
    ["Sunday","Sunday in the Park with George","Sunday shopping","Sunday Bloody Sunday","Sunday Times Golden Globe Race","Sunday All Stars","Sunday Night (South Korean TV series)","Sunday Silence","Sunday Without God","Sunday Independent (Ireland)"]
    

控制台

  1. 如需执行工作流,请在 Google Cloud 控制台中转到工作流页面:

    进入 Workflows

  2. 工作流页面上,选择一个工作流以转到其详情页面。

  3. 工作流详情页面上,点击 执行

  4. 执行工作流页面的输入窗格中,输入可选的运行时参数,以传递给工作流执行。参数必须采用 JSON 格式;例如 {"animal":"cat"}。如果您的工作流不使用运行时参数,请将此字段留空。

  5. (可选)指定要应用于工作流执行的通话记录的级别。在通话记录级别列表中,选择以下选项之一:

    • 未指定:未指定日志记录级别。这是默认设置。 除非未指定执行日志级别(默认),否则执行日志级别优先于任何工作流日志级别;在这种情况下,工作流日志级别适用。
    • 仅限错误:记录所有已捕获的异常;或者调用因异常而停止时。
    • 所有调用:记录对子工作流或库函数及其结果的所有调用。
    • 无日志:无调用日志记录。

  6. 点击执行

  7. 执行详情页面上,您可以查看执行结果,包括任何输出、执行 ID 和状态以及工作流执行的当前或最后一步。如需了解详情,请参阅访问工作流执行结果

gcloud

  1. 打开终端。

  2. 找到您要执行的工作流的名称。如果您不知道工作流的名称,则可以输入以下命令列出所有工作流:

    gcloud workflows list
    
  3. 您可以通过两种方式执行工作流:

    • 执行工作流并等待执行完成:

      gcloud workflows run WORKFLOW_NAME \
      --call-log-level=CALL_LOGGING_LEVEL \
      --data=DATA
      
    • 执行工作流,而无需等待执行尝试完成:

      gcloud workflows execute WORKFLOW_NAME \
      --call-log-level=CALL_LOGGING_LEVEL \
      --data=DATA
      

      替换以下内容:

      • WORKFLOW_NAME:工作流的名称。
      • CALL_LOGGING_LEVEL(可选):在执行期间应用的通话记录级别。可以是以下之一:

        • none:未指定日志记录级别。这是默认设置。除非未指定执行日志级别(默认),否则执行日志级别优先于任何工作流日志级别;在这种情况下,工作流日志级别适用。
        • log-errors-only:记录所有已捕获的异常;或者调用因异常而停止时。
        • log-all-calls:记录对子工作流或库函数及其调用的所有调用。
        • log-none:无调用日志记录。
      • DATA(可选):您的工作流的运行时参数,其采用 JSON 格式。

  4. 如果运行 gcloud workflows execute,则返回工作流执行尝试的唯一 ID,输出类似于以下内容:

     To view the workflow status, you can use following command:
     gcloud workflows executions describe b113b589-8eff-4968-b830-8d35696f0b33 --workflow workflow-2 --location us-central1

    如需查看执行状态,请输入上一步返回的命令。

如果执行尝试成功,输出类似于以下内容,state 表示工作流成功,status 表示执行的最后一个工作流步骤。

argument: '{"searchTerm":"Friday"}'
endTime: '2022-06-22T12:17:53.086073678Z'
name: projects/1051295516635/locations/us-central1/workflows/myFirstWorkflow/executions/c4dffd1f-13db-46a0-8a4a-ee39c144cb96
result: '["Friday","Friday the 13th (franchise)","Friday Night Lights (TV series)","Friday
    the 13th (1980 film)","Friday the 13th","Friday the 13th (2009 film)","Friday the
    13th Part III","Friday the 13th Part 2","Friday (Rebecca Black song)","Friday Night
    Lights (film)"]'
startTime: '2022-06-22T12:17:52.799387653Z'
state: SUCCEEDED
status:
    currentSteps:
    - routine: main
        step: returnOutput
workflowRevisionId: 000001-ac2

REST API

如需使用指定工作流的最新修订版本创建新执行,请使用 projects.locations.workflows.executions.create 方法。

请注意,如需进行身份验证,您需要一个具有足够权限的服务帐号来执行此工作流。例如,您可以为服务帐号授予 Workflows Invoker 角色 (roles/workflows.invoker),以便该帐号具有触发工作流执行的权限。如需了解详情,请参阅调用工作流

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_NUMBERIAM 和管理设置页面中列出的 Google Cloud 项目编号。
  • LOCATION:部署工作流的区域,例如 us-central1
  • WORKFLOW_ID:用户定义的工作流名称,例如 myFirstWorkflow
  • PARAMETER:可选。如果您要执行的工作流可以接收运行时参数,并将这些参数作为执行请求的一部分进行传递,则可以在请求正文中添加一个 JSON 格式的字符串,该字符串的值为一个或多个转义“参数-值”对(例如 "{\"searchTerm\":\"asia\"}")。
  • VALUE:可选。工作流可以作为运行时参数接收的参数-值对的值。
  • CALL_LOGGING_LEVEL:可选。 要在执行期间应用的调用日志记录级别。默认情况是未指定日志记录级别,而应用工作流日志级别。如需了解详情,请参阅将日志发送到 Logging。以下其中一列:
    • CALL_LOG_LEVEL_UNSPECIFIED:未指定日志记录级别,系统会应用工作流日志级别。这是默认设置。否则,系统会应用执行日志级别,其优先级高于工作流日志级别。
    • LOG_ERRORS_ONLY:记录捕获的所有异常;或者由于异常而停止调用。
    • LOG_ALL_CALLS:记录对子工作流或库函数的所有调用及其结果。
    • LOG_NONE:无通话记录。

请求 JSON 正文:

{
  "argument": "{\"PARAMETER\":\"VALUE\"}",
  "callLogLevel": "CALL_LOGGING_LEVEL"
}

如需发送您的请求,请展开以下选项之一:

如果成功,响应正文将包含一个新创建的 Execution 实例:

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION/workflows/WORKFLOW_ID/executions/EXECUTION_ID",
  "startTime": "2023-11-07T14:35:27.215337069Z",
  "state": "ACTIVE",
  "argument": "{\"PARAMETER\":\"VALUE\"}",
  "workflowRevisionId": "000001-2df",
  "callLogLevel": "CALL_LOGGING_LEVEL",
  "status": {}
}

查看执行状态

您可以使用多个命令来帮助检查工作流执行的状态。

  • 如需检索工作流的执行尝试及其 ID 列表,请输入以下命令:

    gcloud workflows executions list WORKFLOW_NAME
    

    WORKFLOW_NAME 替换为工作流的名称。

    该命令会返回类似于以下内容的 NAME 值:

    projects/PROJECT_NUMBER/locations/REGION/workflows/WORKFLOW_NAME/executions/EXECUTION_ID

    复制执行 ID 以在下一个命令中使用。

  • 如需检查执行尝试的状态并等待尝试完成,请输入以下命令:

    gcloud workflows executions wait EXECUTION_ID
    

    EXECUTION_ID 替换为执行尝试的 ID。

    该命令会等到执行尝试完成后,再返回结果。

  • 如需等到上次执行完成之后返回已完成执行的结果,请输入以下命令:

    gcloud workflows executions wait-last
    

    如果您之前在同一 gcloud 会话中尝试过,该命令会等待前一执行尝试完成,然后返回已完成执行的结果。如果不存在先前的尝试,gcloud 会返回以下错误:

    ERROR: (gcloud.workflows.executions.wait-last) [NOT FOUND] There are no cached executions available.
    
  • 如需获取上次执行的状态,请输入以下命令:

    gcloud workflows executions describe-last
    

    如果您曾在同一 gcloud 会话中尝试过执行该命令,则该命令会返回上次执行的结果,即使它正在运行也是如此。如果前一个尝试不存在,gcloud 将返回以下错误:

    ERROR: (gcloud.beta.workflows.executions.describe-last) [NOT FOUND] There are no cached executions available.
    

后续步骤