クライアント ライブラリを使用してワークフローを実行する

クライアント ライブラリを使用してワークフローを実行します。実行が終了し、結果を出力するまで、指数バックオフを使用してワークフローの実行をポーリングします。

もっと見る

このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。

コードサンプル

Java

このサンプルを試す前に、クライアント ライブラリを使用した Workflows クイックスタートJava の手順に沿って設定を行ってください。 詳細については、Workflows Java API のリファレンス ドキュメントをご覧ください。

Workflows に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

// Imports the Google Cloud client library

import com.google.cloud.workflows.executions.v1.CreateExecutionRequest;
import com.google.cloud.workflows.executions.v1.Execution;
import com.google.cloud.workflows.executions.v1.ExecutionsClient;
import com.google.cloud.workflows.executions.v1.WorkflowName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class WorkflowsQuickstart {

  private static final String PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
  private static final String LOCATION = System.getenv().getOrDefault("LOCATION", "us-central1");
  private static final String WORKFLOW =
      System.getenv().getOrDefault("WORKFLOW", "myFirstWorkflow");

  public static void main(String... args)
      throws IOException, InterruptedException, ExecutionException {
    if (PROJECT == null) {
      throw new IllegalArgumentException(
          "Environment variable 'GOOGLE_CLOUD_PROJECT' is required to run this quickstart.");
    }
    workflowsQuickstart(PROJECT, LOCATION, WORKFLOW);
  }

  private static volatile boolean finished;

  public static void workflowsQuickstart(String projectId, String location, String workflow)
      throws IOException, InterruptedException, ExecutionException {
    // Initialize client that will be used to send requests. This client only needs
    // to be created once, and can be reused for multiple requests. After completing all of your
    // requests, call the "close" method on the client to safely clean up any remaining background
    // resources.
    try (ExecutionsClient executionsClient = ExecutionsClient.create()) {
      // Construct the fully qualified location path.
      WorkflowName parent = WorkflowName.of(projectId, location, workflow);

      // Creates the execution object.
      CreateExecutionRequest request =
          CreateExecutionRequest.newBuilder()
              .setParent(parent.toString())
              .setExecution(Execution.newBuilder().build())
              .build();
      Execution response = executionsClient.createExecution(request);

      String executionName = response.getName();
      System.out.printf("Created execution: %s%n", executionName);

      long backoffTime = 0;
      long backoffDelay = 1_000; // Start wait with delay of 1,000 ms
      final long backoffTimeout = 10 * 60 * 1_000; // Time out at 10 minutes
      System.out.println("Poll for results...");

      // Wait for execution to finish, then print results.
      while (!finished && backoffTime < backoffTimeout) {
        Execution execution = executionsClient.getExecution(executionName);
        finished = execution.getState() != Execution.State.ACTIVE;

        // If we haven't seen the results yet, wait.
        if (!finished) {
          System.out.println("- Waiting for results");
          Thread.sleep(backoffDelay);
          backoffTime += backoffDelay;
          backoffDelay *= 2; // Double the delay to provide exponential backoff.
        } else {
          System.out.println("Execution finished with state: " + execution.getState().name());
          System.out.println("Execution results: " + execution.getResult());
        }
      }
    }
  }
}

Node.js

const {ExecutionsClient} = require('@google-cloud/workflows');
const client = new ExecutionsClient();
/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'my-project';
// const location = 'us-central1';
// const workflow = 'myFirstWorkflow';
// const searchTerm = '';

/**
 * Executes a Workflow and waits for the results with exponential backoff.
 * @param {string} projectId The Google Cloud Project containing the workflow
 * @param {string} location The workflow location
 * @param {string} workflow The workflow name
 * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
 */
async function executeWorkflow(projectId, location, workflow, searchTerm) {
  /**
   * Sleeps the process N number of milliseconds.
   * @param {Number} ms The number of milliseconds to sleep.
   */
  function sleep(ms) {
    return new Promise(resolve => {
      setTimeout(resolve, ms);
    });
  }
  const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
  // Execute workflow
  try {
    const createExecutionRes = await client.createExecution({
      parent: client.workflowPath(projectId, location, workflow),
      execution: {
        // Runtime arguments can be passed as a JSON string
        argument: JSON.stringify(runtimeArgs),
      },
    });
    const executionName = createExecutionRes[0].name;
    console.log(`Created execution: ${executionName}`);

    // Wait for execution to finish, then print results.
    let executionFinished = false;
    let backoffDelay = 1000; // Start wait with delay of 1,000 ms
    console.log('Poll every second for result...');
    while (!executionFinished) {
      const [execution] = await client.getExecution({
        name: executionName,
      });
      executionFinished = execution.state !== 'ACTIVE';

      // If we haven't seen the result yet, wait a second.
      if (!executionFinished) {
        console.log('- Waiting for results...');
        await sleep(backoffDelay);
        backoffDelay *= 2; // Double the delay to provide exponential backoff.
      } else {
        console.log(`Execution finished with state: ${execution.state}`);
        console.log(execution.result);
        return execution.result;
      }
    }
  } catch (e) {
    console.error(`Error executing workflow: ${e}`);
  }
}

executeWorkflow(projectId, location, workflowName, searchTerm).catch(err => {
  console.error(err.message);
  process.exitCode = 1;
});

Node.js

import {ExecutionsClient} from '@google-cloud/workflows';
const client: ExecutionsClient = new ExecutionsClient();
/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'my-project';
// const location = 'us-central1';
// const workflow = 'myFirstWorkflow';
// const searchTerm = '';

/**
 * Executes a Workflow and waits for the results with exponential backoff.
 * @param {string} projectId The Google Cloud Project containing the workflow
 * @param {string} location The workflow location
 * @param {string} workflow The workflow name
 * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
 */
async function executeWorkflow(
  projectId: string,
  location: string,
  workflow: string,
  searchTerm: string
) {
  /**
   * Sleeps the process N number of milliseconds.
   * @param {Number} ms The number of milliseconds to sleep.
   */
  function sleep(ms: number): Promise<unknown> {
    return new Promise(resolve => {
      setTimeout(resolve, ms);
    });
  }
  const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
  // Execute workflow
  try {
    const createExecutionRes = await client.createExecution({
      parent: client.workflowPath(projectId, location, workflow),
      execution: {
        // Runtime arguments can be passed as a JSON string
        argument: JSON.stringify(runtimeArgs),
      },
    });
    const executionName = createExecutionRes[0].name;
    console.log(`Created execution: ${executionName}`);

    // Wait for execution to finish, then print results.
    let executionFinished = false;
    let backoffDelay = 1000; // Start wait with delay of 1,000 ms
    console.log('Poll every second for result...');
    while (!executionFinished) {
      const [execution] = await client.getExecution({
        name: executionName,
      });
      executionFinished = execution.state !== 'ACTIVE';

      // If we haven't seen the result yet, wait a second.
      if (!executionFinished) {
        console.log('- Waiting for results...');
        await sleep(backoffDelay);
        backoffDelay *= 2; // Double the delay to provide exponential backoff.
      } else {
        console.log(`Execution finished with state: ${execution.state}`);
        console.log(execution.result);
        return execution.result;
      }
    }
  } catch (e) {
    console.error(`Error executing workflow: ${e}`);
  }
}

executeWorkflow(projectId, location, workflowName, searchTerm).catch(
  (err: Error) => {
    console.error(err.message);
    process.exitCode = 1;
  }
);

Python

このサンプルを試す前に、クライアント ライブラリを使用した Workflows クイックスタートPython の手順に沿って設定を行ってください。 詳細については、Workflows Python API のリファレンス ドキュメントをご覧ください。

Workflows に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

import time

from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1
from google.cloud.workflows.executions_v1 import Execution
from google.cloud.workflows.executions_v1.types import executions


def execute_workflow(
    project: str, location: str = "us-central1", workflow: str = "myFirstWorkflow"
) -> Execution:
    """Execute a workflow and print the execution results.

    A workflow consists of a series of steps described using the Workflows syntax, and can be written in either YAML or JSON.

    Args:
        project: The Google Cloud project id which contains the workflow to execute.
        location: The location for the workflow
        workflow: The ID of the workflow to execute.

    Returns:
        The execution response.
    """
    # Set up API clients.
    execution_client = executions_v1.ExecutionsClient()
    workflows_client = workflows_v1.WorkflowsClient()
    # Construct the fully qualified location path.
    parent = workflows_client.workflow_path(project, location, workflow)

    # Execute the workflow.
    response = execution_client.create_execution(request={"parent": parent})
    print(f"Created execution: {response.name}")

    # Wait for execution to finish, then print results.
    execution_finished = False
    backoff_delay = 1  # Start wait with delay of 1 second
    print("Poll for result...")
    while not execution_finished:
        execution = execution_client.get_execution(request={"name": response.name})
        execution_finished = execution.state != executions.Execution.State.ACTIVE

        # If we haven't seen the result yet, wait a second.
        if not execution_finished:
            print("- Waiting for results...")
            time.sleep(backoff_delay)
            # Double the delay to provide exponential backoff.
            backoff_delay *= 2
        else:
            print(f"Execution finished with state: {execution.state.name}")
            print(f"Execution results: {execution.result}")
            return execution

次のステップ

他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。