ワークフローを実行する

ワークフローを実行すると、そのワークフローに関連付けられた現在のワークフロー定義が実行されます。

ワークフロー実行リクエストでランタイム引数を渡し、ワークフロー変数を使用してこれらの引数にアクセスできます。詳細については、実行リクエストでランタイム引数を渡すをご覧ください。

ワークフローの実行が完了すると、履歴と結果が一定期間保持されます。詳細については、割り当てと上限をご覧ください。

準備

組織で定義されているセキュリティの制約により、次の手順を完了できない場合があります。トラブルシューティング情報については、制約のある Google Cloud 環境でアプリケーションを開発するをご覧ください。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. ワークフローが他の Google Cloud リソースにアクセスする場合、適切な権限を持つサービス アカウントに関連付けられていることを確認します。既存のワークフローに関連付けられているサービス アカウントについては、ワークフローに関連付けられたサービス アカウントの確認をご覧ください。

    リソースを作成してサービス アカウントを関連付けるには、対象のリソースを作成し、リソースに関連付けるサービス アカウントになりすますための権限が必要です。詳細については、サービス アカウント権限をご覧ください。

  7. Google Cloud コンソールまたは Google Cloud CLI を使用してワークフローをデプロイします。

ワークフローを実行する

クライアント ライブラリ、Google Cloud コンソール、gcloud CLI を使用するか、Workflows REST API にリクエストを送信して、ワークフローを実行できます。

コンソール

  1. ワークフローを実行するには、Google Cloud コンソールで [Workflows] ページに移動します。

    [ワークフロー] に移動

  2. [ワークフロー] ページで、ワークフローを選択して詳細ページに移動します。

  3. [ワークフローの詳細] ページで [ 実行] を選択します。

  4. [ワークフローの実行] ページの [入力] ペインに、ワークフローに渡すオプションのランタイム引数を入力できます。引数は JSON 形式にする必要があります(例: {"animal":"cat"})。ワークフローでランタイム引数を使用しない場合は、ここは空白のままにします。

  5. 必要に応じて、ワークフローの実行中に適用するコールロギングのレベルを指定します。[呼び出しログレベル] リストで、次のいずれかを選択します。

    • 指定なし: ロギングレベルは指定されません。この設定がデフォルトです。実行ログレベルが指定されていない場合を除き、実行ログレベルがワークフローのログレベルよりも優先されます(デフォルト)。実行ログレベルが指定されていない場合、ワークフローのログレベルが適用されます。
    • エラーのみ: キャッチされた例外をすべてログに記録します。または、例外により呼び出しが停止した場合についてもログに記録します。
    • すべてのコール: サブワークフローまたはライブラリ関数のすべての呼び出しとその結果を記録します。
    • ログがない: コールロギングはありません。

  6. [実行] をクリックします。

  7. [実行の詳細] ページで、出力 、実行 ID と状態、ワークフロー実行の現在のステップまたは最後のステップを含む、実行結果を確認できます。詳細については、ワークフローの実行結果にアクセスするをご覧ください。

gcloud

  1. ターミナルを開きます。

  2. 実行するワークフローの名前を見つけます。ワークフローの名前がわからない場合は、次のコマンドを入力して、すべてのワークフローを一覧表示できます。

    gcloud workflows list
  3. ワークフローを実行するには、gcloud workflows run コマンドまたは gcloud workflows execute コマンドを使用します。

    • ワークフローを実行し、実行が完了するまで待ちます。

      gcloud workflows run WORKFLOW_NAME \
      --call-log-level=CALL_LOGGING_LEVEL \
      --data=DATA
    • 実行の試行の完了を待たずにワークフローを実行します。

      gcloud workflows execute WORKFLOW_NAME \
      --call-log-level=CALL_LOGGING_LEVEL \
      --data=DATA

      以下を置き換えます。

      • WORKFLOW_NAME: ワークフローの名前。
      • CALL_LOGGING_LEVEL(省略可): 実行中に適用するコールロギングのレベル。以下のいずれかになります。

        • none: ロギングレベルは指定されません。この設定がデフォルトです。実行ログレベルが指定されていない場合を除き、実行ログレベルがワークフローのログレベルよりも優先されます(デフォルト)。実行ログレベルが指定されていない場合、ワークフローのログレベルが適用されます。
        • log-errors-only: キャッチされた例外をすべてログに記録します。または、例外により呼び出しが停止した場合についてもログに記録します。
        • log-all-calls: サブワークフローまたはライブラリ関数のすべての呼び出しとその結果をログに記録します。
        • log-none: コールロギングなし。
      • DATA(省略可): ワークフローのランタイム引数(JSON 形式)。

  4. gcloud workflows execute を実行すると、ワークフロー実行の試行の一意の ID が返され、出力は次のようになります。

     To view the workflow status, you can use following command:
     gcloud workflows executions describe b113b589-8eff-4968-b830-8d35696f0b33 --workflow workflow-2 --location us-central1

    実行のステータスを表示するには、前の手順で返されたコマンドを入力します。

実行の試行が成功した場合、出力は次のようになります。state はワークフローの正常終了を示し、status は実行の最終ワークフロー ステップを示します。

argument: '{"searchTerm":"Friday"}'
endTime: '2022-06-22T12:17:53.086073678Z'
name: projects/1051295516635/locations/us-central1/workflows/myFirstWorkflow/executions/c4dffd1f-13db-46a0-8a4a-ee39c144cb96
result: '["Friday","Friday the 13th (franchise)","Friday Night Lights (TV series)","Friday
    the 13th (1980 film)","Friday the 13th","Friday the 13th (2009 film)","Friday the
    13th Part III","Friday the 13th Part 2","Friday (Rebecca Black song)","Friday Night
    Lights (film)"]'
startTime: '2022-06-22T12:17:52.799387653Z'
state: SUCCEEDED
status:
    currentSteps:
    - routine: main
        step: returnOutput
workflowRevisionId: 000001-ac2

クライアント ライブラリ

次のサンプルでは、ワークフロー myFirstWorkflow をデプロイしていることを前提としています。

  1. クライアント ライブラリをインストールし、開発環境を設定します。詳細については、Workflows クライアント ライブラリの概要をご覧ください。

  2. ローカルマシンにサンプルアプリのリポジトリのクローンを作成します。

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

  3. Workflows のサンプルコードが含まれているディレクトリに移動します。

    Java

    cd java-docs-samples/workflows/cloud-client/

    Node.js

    cd nodejs-docs-samples/workflows/quickstart/

    Python

    cd python-docs-samples/workflows/cloud-client/

  4. サンプルコードを見てみましょう。

    Java

    // Imports the Google Cloud client library
    
    import com.google.cloud.workflows.executions.v1.CreateExecutionRequest;
    import com.google.cloud.workflows.executions.v1.Execution;
    import com.google.cloud.workflows.executions.v1.ExecutionsClient;
    import com.google.cloud.workflows.executions.v1.WorkflowName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class WorkflowsQuickstart {
    
      private static final String PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
      private static final String LOCATION = System.getenv().getOrDefault("LOCATION", "us-central1");
      private static final String WORKFLOW =
          System.getenv().getOrDefault("WORKFLOW", "myFirstWorkflow");
    
      public static void main(String... args)
          throws IOException, InterruptedException, ExecutionException {
        if (PROJECT == null) {
          throw new IllegalArgumentException(
              "Environment variable 'GOOGLE_CLOUD_PROJECT' is required to run this quickstart.");
        }
        workflowsQuickstart(PROJECT, LOCATION, WORKFLOW);
      }
    
      private static volatile boolean finished;
    
      public static void workflowsQuickstart(String projectId, String location, String workflow)
          throws IOException, InterruptedException, ExecutionException {
        // Initialize client that will be used to send requests. This client only needs
        // to be created once, and can be reused for multiple requests. After completing all of your
        // requests, call the "close" method on the client to safely clean up any remaining background
        // resources.
        try (ExecutionsClient executionsClient = ExecutionsClient.create()) {
          // Construct the fully qualified location path.
          WorkflowName parent = WorkflowName.of(projectId, location, workflow);
    
          // Creates the execution object.
          CreateExecutionRequest request =
              CreateExecutionRequest.newBuilder()
                  .setParent(parent.toString())
                  .setExecution(Execution.newBuilder().build())
                  .build();
          Execution response = executionsClient.createExecution(request);
    
          String executionName = response.getName();
          System.out.printf("Created execution: %s%n", executionName);
    
          long backoffTime = 0;
          long backoffDelay = 1_000; // Start wait with delay of 1,000 ms
          final long backoffTimeout = 10 * 60 * 1_000; // Time out at 10 minutes
          System.out.println("Poll for results...");
    
          // Wait for execution to finish, then print results.
          while (!finished && backoffTime < backoffTimeout) {
            Execution execution = executionsClient.getExecution(executionName);
            finished = execution.getState() != Execution.State.ACTIVE;
    
            // If we haven't seen the results yet, wait.
            if (!finished) {
              System.out.println("- Waiting for results");
              Thread.sleep(backoffDelay);
              backoffTime += backoffDelay;
              backoffDelay *= 2; // Double the delay to provide exponential backoff.
            } else {
              System.out.println("Execution finished with state: " + execution.getState().name());
              System.out.println("Execution results: " + execution.getResult());
            }
          }
        }
      }
    }

    Node.js (JavaScript)

    const {ExecutionsClient} = require('@google-cloud/workflows');
    const client = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(projectId, location, workflow, searchTerm) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms) {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(err => {
      console.error(err.message);
      process.exitCode = 1;
    });
    

    Node.js (TypeScript)

    import {ExecutionsClient} from '@google-cloud/workflows';
    const client: ExecutionsClient = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(
      projectId: string,
      location: string,
      workflow: string,
      searchTerm: string
    ) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms: number): Promise<unknown> {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(
      (err: Error) => {
        console.error(err.message);
        process.exitCode = 1;
      }
    );

    Python

    import time
    
    from google.cloud import workflows_v1
    from google.cloud.workflows import executions_v1
    from google.cloud.workflows.executions_v1 import Execution
    from google.cloud.workflows.executions_v1.types import executions
    
    def execute_workflow(
        project: str, location: str = "us-central1", workflow: str = "myFirstWorkflow"
    ) -> Execution:
        """Execute a workflow and print the execution results.
    
        A workflow consists of a series of steps described using the Workflows syntax, and can be written in either YAML or JSON.
    
        Args:
            project: The Google Cloud project id which contains the workflow to execute.
            location: The location for the workflow
            workflow: The ID of the workflow to execute.
    
        Returns:
            The execution response.
        """
        # Set up API clients.
        execution_client = executions_v1.ExecutionsClient()
        workflows_client = workflows_v1.WorkflowsClient()
        # Construct the fully qualified location path.
        parent = workflows_client.workflow_path(project, location, workflow)
    
        # Execute the workflow.
        response = execution_client.create_execution(request={"parent": parent})
        print(f"Created execution: {response.name}")
    
        # Wait for execution to finish, then print results.
        execution_finished = False
        backoff_delay = 1  # Start wait with delay of 1 second
        print("Poll for result...")
        while not execution_finished:
            execution = execution_client.get_execution(request={"name": response.name})
            execution_finished = execution.state != executions.Execution.State.ACTIVE
    
            # If we haven't seen the result yet, wait a second.
            if not execution_finished:
                print("- Waiting for results...")
                time.sleep(backoff_delay)
                # Double the delay to provide exponential backoff.
                backoff_delay *= 2
            else:
                print(f"Execution finished with state: {execution.state.name}")
                print(f"Execution results: {execution.result}")
                return execution
    
    

    サンプルでは以下を行います。

    1. Workflows 用の Cloud クライアント ライブラリを設定します。
    2. ワークフローを実行します。
    3. 実行が終了するまで、ワークフローの実行をポーリングします(指数バックオフを使用)。
    4. 実行結果を出力します。
  5. サンプルを実行するには、まず依存関係をインストールします。

    Java

    mvn compile

    Node.js (JavaScript)

    npm install

    Node.js (TypeScript)

    npm install && npm run build

    Python

    pip3 install -r requirements.txt

  6. スクリプトを実行します。

    Java

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME mvn compile exec:java -Dexec.mainClass=com.example.workflows.WorkflowsQuickstart

    Node.js (JavaScript)

    npm start PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Node.js (TypeScript)

    npm start PROJECT_ID CLOUD_REGION WORKFLOW_NAME

    Python

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME python3 main.py

    以下を置き換えます。

    • PROJECT_ID (必須)Google Cloud プロジェクトのプロジェクト ID
    • CLOUD_REGION: ワークフローのロケーション(デフォルト: us-central1
    • WORKFLOW_NAME: ワークフローの ID(デフォルト: myFirstWorkflow

    出力は次のようになります。

    Execution finished with state: SUCCEEDED
    ["Sunday","Sunday in the Park with George","Sunday shopping","Sunday Bloody Sunday","Sunday Times Golden Globe Race","Sunday All Stars","Sunday Night (South Korean TV series)","Sunday Silence","Sunday Without God","Sunday Independent (Ireland)"]
    

REST API

特定のワークフローの最新のリビジョンを使用して新しい実行を作成するには、projects.locations.workflows.executions.create メソッドを使用します。

認証を行うには、ワークフローを実行するための十分な権限を持つサービス アカウントが必要です。たとえば、サービス アカウントに Workflows 起動元ロール(roles/workflows.invoker) を付与して、そのアカウントにワークフロー実行をトリガーする権限を持たせることができます。詳細については、Workflows を呼び出すをご覧ください。

リクエストのデータを使用する前に、次のように置き換えます。

  • PROJECT_NUMBER: [IAM と管理] の [設定] ページに表示される Google Cloud プロジェクト番号。
  • LOCATION: ワークフローがデプロイされているリージョン(例: us-central1)。
  • WORKFLOW_NAME: ワークフローのユーザー定義名(例: myFirstWorkflow)。
  • PARAMETER: 省略可。実行しているワークフローが、実行リクエストの一部として渡すランタイム引数を受け取ることができる場合、値が1 つ以上のエスケープされたパラメータ値のペアである JSON 形式の文字列をリクエスト本文に追加できます(例: "{\"searchTerm\":\"asia\"}")。
  • VALUE: 省略可。ワークフローがランタイム引数として受け取るパラメータ値のペアの値。
  • CALL_LOGGING_LEVEL: 省略可。 実行中に適用する呼び出しロギングのレベル。デフォルトではロギングレベルが指定されず、代わりにワークフロー ログレベルが適用されます。詳細については、Logging にログを送信するをご覧ください。次のいずれかになります。
    • CALL_LOG_LEVEL_UNSPECIFIED: ロギングレベルは指定されず、代わりにワークフロー ログレベルが適用されます。この設定がデフォルトです。それ以外の場合、実行ログレベルが適用され、ワークフロー ログレベルよりも優先されます。
    • LOG_ERRORS_ONLY: キャッチされた例外をすべてログに記録します。または、例外により呼び出しが停止した場合についてもログに記録します。
    • LOG_ALL_CALLS: サブワークフローまたはライブラリ関数のすべての呼び出しとその結果をログに記録します。
    • LOG_NONE: コールロギングなし。

JSON 本文のリクエスト:

{
  "argument": "{\"PARAMETER\":\"VALUE\"}",
  "callLogLevel": "CALL_LOGGING_LEVEL"
}

リクエストを送信するには、次のいずれかのオプションを展開します。

成功した場合、レスポンスの本文には、新しく作成された Execution のインスタンスが含まれます。

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION/workflows/WORKFLOW_NAME/executions/EXECUTION_ID",
  "startTime": "2023-11-07T14:35:27.215337069Z",
  "state": "ACTIVE",
  "argument": "{\"PARAMETER\":\"VALUE\"}",
  "workflowRevisionId": "000001-2df",
  "callLogLevel": "CALL_LOGGING_LEVEL",
  "status": {}
}

実行のステータスを確認する

ワークフロー実行のステータスの確認に役立つコマンドがいくつかあります。

  • ワークフロー実行の試行とその ID のリストを取得するには、次のコマンドを入力します。

    gcloud workflows executions list WORKFLOW_NAME

    WORKFLOW_NAME はワークフローの名前で置き換えます。

    このコマンドは、次のような NAME 値を返します。

    projects/PROJECT_NUMBER/locations/REGION/workflows/WORKFLOW_NAME/executions/EXECUTION_ID

    次のコマンドで使用する実行 ID をコピーします。

  • 実行の試行のステータスを確認し、試行が完了するまで待機するには、次のコマンドを入力します。

    gcloud workflows executions wait EXECUTION_ID

    EXECUTION_ID を実行の試行 ID に置き換えます。

    このコマンドは、実行の試行の完了を待ってから、結果を返します。

  • 最後の実行が完了するまで待ってから、完了した実行の結果を返すには、次のコマンドを入力します。

    gcloud workflows executions wait-last

    同じ gcloud セッションで以前の実行を試行した場合、コマンドは、以前の実行の試行が完了するのを待ってから、完了した実行の結果を返します。以前の試行が存在しない場合、gcloud は次のエラーを返します。

    ERROR: (gcloud.workflows.executions.wait-last) [NOT FOUND] There are no cached executions available.
    
  • 最後の実行のステータスを取得するには、次のコマンドを入力します。

    gcloud workflows executions describe-last

    同じ gcloud セッションで以前の実行を試行した場合、たとえそれが実行中であっても、コマンドは最後の実行の結果を返します。以前の試行が存在しない場合、gcloud は次のエラーを返します。

    ERROR: (gcloud.beta.workflows.executions.describe-last) [NOT FOUND] There are no cached executions available.
    

実行をフィルタ

workflows.executions.list メソッドによって返されるワークフロー実行のリストにフィルタを適用できます。

次のフィールドでフィルタリングできます。

  • duration
  • endTime
  • executionId
  • label
  • startTime
  • state
  • stepName
  • workflowRevisionId

たとえば、ラベル(labels."fruit":"apple")でフィルタリングするには、次のような API リクエストを行います。

GET https://workflowexecutions.googleapis.com/v1/projects/MY_PROJECT/locations/MY_LOCATION/workflows/MY_WORKFLOW/executions?view=full&filter=labels.%22fruit%22%3A%22apple%22"

ここで

  • view=full は、返される実行で入力するフィールドを定義するビューを指定します。この場合、すべてのデータ
  • labels.%22fruit%22%3A%22apple%22 は、URL エンコードされたフィルタ構文です。

詳細については、AIP-160 フィルタリングをご覧ください。

次のステップ