快速入门:使用 Cloud 客户端库执行工作流

本快速入门介绍如何使用客户端库执行工作流和查看执行结果。

如需详细了解如何安装客户端库和设置开发环境,请参阅 Workflows 客户端库概览

准备工作

如果您的组织将限制应用于您的 Google Cloud 环境,则本文档中的某些步骤可能无法正常工作。在这种情况下,您可能无法完成创建公共 IP 地址或服务帐号密钥等任务。如果您发出的请求会返回有关限制条件的错误,请参阅如何在受限的 Google Cloud 环境中开发应用

  1. 本快速入门中的示例假设您已经部署了工作流 myFirstWorkflow。如果您尚未部署,请使用 Google Cloud Consolegcloud 命令行工具立即部署。
  2. 下载并安装 Git 源代码管理工具。

获取示例代码

  1. 将示例应用代码库克隆到本地机器:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    或者,您也可以下载该示例的 zip 文件并将其解压缩。

  2. 切换到包含工作流示例代码的目录:

    Java

    cd java-docs-samples/workflows/cloud-client/

    Node.js

    cd nodejs-docs-samples/workflows/quickstart/

    Python

    cd python-docs-samples/workflows/cloud-client/

  3. 查看示例代码:

    Java

    
    // Imports the Google Cloud client library
    
    import com.google.cloud.workflows.executions.v1.CreateExecutionRequest;
    import com.google.cloud.workflows.executions.v1.Execution;
    import com.google.cloud.workflows.executions.v1.ExecutionsClient;
    import com.google.cloud.workflows.executions.v1.WorkflowName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class WorkflowsQuickstart {
    
      private static final String PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
      private static final String LOCATION = System.getenv().getOrDefault("LOCATION", "us-central1");
      private static final String WORKFLOW = System.getenv().getOrDefault("WORKFLOW",
          "myFirstWorkflow");
    
      public static void main(String... args)
          throws IOException, InterruptedException, ExecutionException {
        if (PROJECT == null) {
          throw new IllegalArgumentException(
            "Environment variable 'GOOGLE_CLOUD_PROJECT' is required to run this quickstart.");
        }
        workflowsQuickstart(PROJECT, LOCATION, WORKFLOW);
      }
    
      private static volatile boolean finished;
    
      public static void workflowsQuickstart(String projectId, String location, String workflow)
          throws IOException, InterruptedException, ExecutionException {
        // Initialize client that will be used to send requests. This client only needs
        // to be created once, and can be reused for multiple requests. After completing all of your
        // requests, call the "close" method on the client to safely clean up any remaining background
        // resources.
        try (ExecutionsClient executionsClient = ExecutionsClient.create()) {
          // Construct the fully qualified location path.
          WorkflowName parent = WorkflowName.of(projectId, location, workflow);
    
          // Creates the execution object.
          CreateExecutionRequest request =
              CreateExecutionRequest.newBuilder()
                  .setParent(parent.toString())
                  .setExecution(Execution.newBuilder().build())
                  .build();
          Execution response = executionsClient.createExecution(request);
    
          String executionName = response.getName();
          System.out.printf("Created execution: %s%n", executionName);
    
          long backoffTime = 0;
          long backoffDelay = 1_000; // Start wait with delay of 1,000 ms
          final long backoffTimeout = 10 * 60 * 1_000; // Time out at 10 minutes
          System.out.println("Poll for results...");
    
          // Wait for execution to finish, then print results.
          while (!finished && backoffTime < backoffTimeout) {
            Execution execution = executionsClient.getExecution(executionName);
            finished = execution.getState() != Execution.State.ACTIVE;
    
            // If we haven't seen the results yet, wait.
            if (!finished) {
              System.out.println("- Waiting for results");
              Thread.sleep(backoffDelay);
              backoffTime += backoffDelay;
              backoffDelay *= 2; // Double the delay to provide exponential backoff.
            } else {
              System.out.println("Execution finished with state: " + execution.getState().name());
              System.out.println("Execution results: " + execution.getResult());
            }
          }
        }
      }
    }

    Node.js

    const {ExecutionsClient} = require('@google-cloud/workflows');
    const client = new ExecutionsClient();
    
    /**
     * Sleeps the process N number of milliseconds.
     * @param {Number} ms The number of milliseconds to sleep.
     */
    function sleep(ms) {
      return new Promise(resolve => {
        setTimeout(resolve, ms);
      });
    }
    
    // Execute workflow
    try {
      const createExecutionRes = await client.createExecution({
        parent: client.workflowPath(projectId, location, workflow),
      });
      const executionName = createExecutionRes[0].name;
      console.log(`Created execution: ${executionName}`);
    
      // Wait for execution to finish, then print results.
      let executionFinished = false;
      let backoffDelay = 1000; // Start wait with delay of 1,000 ms
      console.log('Poll every second for result...');
      while (!executionFinished) {
        const [execution] = await client.getExecution({
          name: executionName,
        });
        executionFinished = execution.state !== 'ACTIVE';
    
        // If we haven't seen the result yet, wait a second.
        if (!executionFinished) {
          console.log('- Waiting for results...');
          await sleep(backoffDelay);
          backoffDelay *= 2; // Double the delay to provide exponential backoff.
        } else {
          console.log(`Execution finished with state: ${execution.state}`);
          console.log(execution.result);
          return execution.result;
        }
      }
    } catch (e) {
      console.error(`Error executing workflow: ${e}`);
    }

    Python

    import time
    
    from google.cloud import workflows_v1beta
    from google.cloud.workflows import executions_v1beta
    from google.cloud.workflows.executions_v1beta.types import executions
    
    # TODO(developer): Uncomment these lines and replace with your values.
    # project = 'my-project-id'
    # location = 'us-central1'
    # workflow = 'myFirstWorkflow'
    
    if not project:
        raise Exception('GOOGLE_CLOUD_PROJECT env var is required.')
    
    # Set up API clients.
    execution_client = executions_v1beta.ExecutionsClient()
    workflows_client = workflows_v1beta.WorkflowsClient()
    
    # Construct the fully qualified location path.
    parent = workflows_client.workflow_path(project, location, workflow)
    
    # Execute the workflow.
    response = execution_client.create_execution(request={"parent": parent})
    print(f"Created execution: {response.name}")
    
    # Wait for execution to finish, then print results.
    execution_finished = False
    backoff_delay = 1  # Start wait with delay of 1 second
    print('Poll every second for result...')
    while (not execution_finished):
        execution = execution_client.get_execution(request={"name": response.name})
        execution_finished = execution.state != executions.Execution.State.ACTIVE
    
        # If we haven't seen the result yet, wait a second.
        if not execution_finished:
            print('- Waiting for results...')
            time.sleep(backoff_delay)
            backoff_delay *= 2  # Double the delay to provide exponential backoff.
        else:
            print(f'Execution finished with state: {execution.state.name}')
            print(execution.result)
            return execution.result

该程序将执行以下操作:

  1. 为工作流设置 Cloud 客户端库。
  2. 执行工作流。
  3. 轮询工作流的执行(使用指数退避算法),直到执行终止为止。
  4. 打印执行结果。

运行示例

  1. 要运行示例,请先安装依赖项:

    Java

    mvn compile

    Node.js

    npm install

    Python

    pip3 install -r requirements.txt

  2. 运行脚本:

    Java

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME mvn compile exec:java -Dexec.mainClass=com.example.workflows.WorkflowsQuickstart

    Node.js

    node . PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Python

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME python3 main.py

    替换以下内容:

    • PROJECT_ID:(必需)Google Cloud 项目的 ID
    • CLOUD_REGION:工作流的位置(默认值:us-central1
    • WORKFLOW_NAME:替换为工作流的 ID(默认值:myFirstWorkflow

    输出内容类似如下:

    Execution finished with state: SUCCEEDED
    ["Sunday","Sunday in the Park with George","Sunday shopping","Sunday Bloody Sunday","Sunday Times Golden Globe Race","Sunday All Stars","Sunday Night (South Korean TV series)","Sunday Silence","Sunday Without God","Sunday Independent (Ireland)"]
    

清理

为避免因本页中使用的资源导致您的 Google Cloud 帐号产生费用,请按照以下步骤操作。

  1. 转到 Cloud Console 中的工作流页面。
    Workflows

  2. 从工作流列表中,点击工作流以转至其工作流详情页面。

  3. 点击删除

  4. 输入工作流的名称,然后点击确认

后续步骤