Cloud クライアント ライブラリを使用してワークフローを実行する

このクイックスタートでは、Cloud クライアント ライブラリを使用してワークフローを実行し、実行結果を表示する方法について説明します。

Cloud クライアント ライブラリのインストールと開発環境の設定の詳細については、Workflows クライアント ライブラリの概要をご覧ください。

ターミナルまたは Cloud Shell で Google Cloud CLI を使用して、次の手順を完了できます。

始める前に

組織で定義されているセキュリティの制約により、次の手順を完了できない場合があります。トラブルシューティング情報については、制約のある環境でアプリケーションを開発する Google Cloud をご覧ください。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Workflows API:

    gcloud services enable workflows.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant the roles/owner IAM role to the service account:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
  14. (省略可)Cloud Logging にログを送信するには、サービス アカウントに roles/logging.logWriter ロールを付与します。

    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member "serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/logging.logWriter"

    サービス アカウントのロールと権限の詳細については、ワークフローにGoogle Cloud リソースへのアクセス権限を付与するをご覧ください。

  15. 必要に応じて、Git ソースコード管理ツールをダウンロードしてインストールします。

サンプル ワークフローをデプロイする

ワークフローを定義したら、デプロイして実行できるようにします。デプロイの手順では、ソースファイルを実行できることも検証されます。

次のワークフローは、リクエストを公開 API に送信してから、API のレスポンスを返します。

  1. 次の内容のテキスト ファイルを myFirstWorkflow.yaml という名前で作成します。

    - getCurrentTime:
        call: http.get
        args:
          url: https://timeapi.io/api/Time/current/zone?timeZone=Europe/Amsterdam
        result: currentTime
    - readWikipedia:
        call: http.get
        args:
          url: https://en.wikipedia.org/w/api.php
          query:
            action: opensearch
            search: ${currentTime.body.dayOfWeek}
        result: wikiResult
    - returnResult:
        return: ${wikiResult.body[1]}
  2. ワークフローを作成したらデプロイできますが、そのワークフローは実行しないでください

    gcloud workflows deploy myFirstWorkflow \
        --source=myFirstWorkflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
        --location=CLOUD_REGION

    CLOUD_REGION は、ワークフローのサポートされているロケーションに置き換えます。コードサンプルで使用されているデフォルトのリージョンは us-central1 です。

サンプルコードを取得する

サンプルコードは GitHub からクローンを作成できます。

  1. ローカルマシンにサンプルアプリのリポジトリのクローンを作成します。

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

    Node.js

    git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

    または、zip 形式のサンプルをダウンロードし、ファイルを抽出してもかまいません。

  2. Workflows のサンプルコードが含まれているディレクトリに移動します。

    Java

    cd java-docs-samples/workflows/cloud-client/

    Node.js

    cd nodejs-docs-samples/workflows/quickstart/

    Python

    cd python-docs-samples/workflows/cloud-client/

  3. サンプルコードを見てみましょう。各サンプルアプリは次の処理を行います。

    1. Workflows 用の Cloud クライアント ライブラリを設定します。
    2. ワークフローを実行します。
    3. 実行が終了するまで、ワークフローの実行をポーリングします(指数バックオフを使用)。
    4. 実行結果を出力します。

    Java

    // Imports the Google Cloud client library
    
    import com.google.cloud.workflows.executions.v1.CreateExecutionRequest;
    import com.google.cloud.workflows.executions.v1.Execution;
    import com.google.cloud.workflows.executions.v1.ExecutionsClient;
    import com.google.cloud.workflows.executions.v1.WorkflowName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class WorkflowsQuickstart {
    
      private static final String PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
      private static final String LOCATION = System.getenv().getOrDefault("LOCATION", "us-central1");
      private static final String WORKFLOW =
          System.getenv().getOrDefault("WORKFLOW", "myFirstWorkflow");
    
      public static void main(String... args)
          throws IOException, InterruptedException, ExecutionException {
        if (PROJECT == null) {
          throw new IllegalArgumentException(
              "Environment variable 'GOOGLE_CLOUD_PROJECT' is required to run this quickstart.");
        }
        workflowsQuickstart(PROJECT, LOCATION, WORKFLOW);
      }
    
      private static volatile boolean finished;
    
      public static void workflowsQuickstart(String projectId, String location, String workflow)
          throws IOException, InterruptedException, ExecutionException {
        // Initialize client that will be used to send requests. This client only needs
        // to be created once, and can be reused for multiple requests. After completing all of your
        // requests, call the "close" method on the client to safely clean up any remaining background
        // resources.
        try (ExecutionsClient executionsClient = ExecutionsClient.create()) {
          // Construct the fully qualified location path.
          WorkflowName parent = WorkflowName.of(projectId, location, workflow);
    
          // Creates the execution object.
          CreateExecutionRequest request =
              CreateExecutionRequest.newBuilder()
                  .setParent(parent.toString())
                  .setExecution(Execution.newBuilder().build())
                  .build();
          Execution response = executionsClient.createExecution(request);
    
          String executionName = response.getName();
          System.out.printf("Created execution: %s%n", executionName);
    
          long backoffTime = 0;
          long backoffDelay = 1_000; // Start wait with delay of 1,000 ms
          final long backoffTimeout = 10 * 60 * 1_000; // Time out at 10 minutes
          System.out.println("Poll for results...");
    
          // Wait for execution to finish, then print results.
          while (!finished && backoffTime < backoffTimeout) {
            Execution execution = executionsClient.getExecution(executionName);
            finished = execution.getState() != Execution.State.ACTIVE;
    
            // If we haven't seen the results yet, wait.
            if (!finished) {
              System.out.println("- Waiting for results");
              Thread.sleep(backoffDelay);
              backoffTime += backoffDelay;
              backoffDelay *= 2; // Double the delay to provide exponential backoff.
            } else {
              System.out.println("Execution finished with state: " + execution.getState().name());
              System.out.println("Execution results: " + execution.getResult());
            }
          }
        }
      }
    }

    Node.js

    const {ExecutionsClient} = require('@google-cloud/workflows');
    const client = new ExecutionsClient();
    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'my-project';
    // const location = 'us-central1';
    // const workflow = 'myFirstWorkflow';
    // const searchTerm = '';
    
    /**
     * Executes a Workflow and waits for the results with exponential backoff.
     * @param {string} projectId The Google Cloud Project containing the workflow
     * @param {string} location The workflow location
     * @param {string} workflow The workflow name
     * @param {string} searchTerm Optional search term to pass to the Workflow as a runtime argument
     */
    async function executeWorkflow(projectId, location, workflow, searchTerm) {
      /**
       * Sleeps the process N number of milliseconds.
       * @param {Number} ms The number of milliseconds to sleep.
       */
      function sleep(ms) {
        return new Promise(resolve => {
          setTimeout(resolve, ms);
        });
      }
      const runtimeArgs = searchTerm ? {searchTerm: searchTerm} : {};
      // Execute workflow
      try {
        const createExecutionRes = await client.createExecution({
          parent: client.workflowPath(projectId, location, workflow),
          execution: {
            // Runtime arguments can be passed as a JSON string
            argument: JSON.stringify(runtimeArgs),
          },
        });
        const executionName = createExecutionRes[0].name;
        console.log(`Created execution: ${executionName}`);
    
        // Wait for execution to finish, then print results.
        let executionFinished = false;
        let backoffDelay = 1000; // Start wait with delay of 1,000 ms
        console.log('Poll every second for result...');
        while (!executionFinished) {
          const [execution] = await client.getExecution({
            name: executionName,
          });
          executionFinished = execution.state !== 'ACTIVE';
    
          // If we haven't seen the result yet, wait a second.
          if (!executionFinished) {
            console.log('- Waiting for results...');
            await sleep(backoffDelay);
            backoffDelay *= 2; // Double the delay to provide exponential backoff.
          } else {
            console.log(`Execution finished with state: ${execution.state}`);
            console.log(execution.result);
            return execution.result;
          }
        }
      } catch (e) {
        console.error(`Error executing workflow: ${e}`);
      }
    }
    
    executeWorkflow(projectId, location, workflowName, searchTerm).catch(err => {
      console.error(err.message);
      process.exitCode = 1;
    });
    

    Python

    import os
    import time
    
    from google.cloud import workflows_v1
    from google.cloud.workflows import executions_v1
    from google.cloud.workflows.executions_v1 import Execution
    from google.cloud.workflows.executions_v1.types import executions
    
    PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
    LOCATION = os.getenv("LOCATION", "us-central1")
    WORKFLOW_ID = os.getenv("WORKFLOW", "myFirstWorkflow")
    
    
    def execute_workflow(project: str, location: str, workflow: str) -> Execution:
        """Execute a workflow and print the execution results.
    
        A workflow consists of a series of steps described
        using the Workflows syntax, and can be written in either YAML or JSON.
    
        Args:
            project: The Google Cloud project id
                which contains the workflow to execute.
            location: The location for the workflow
            workflow: The ID of the workflow to execute.
    
        Returns:
            The execution response.
        """
        # Set up API clients.
        execution_client = executions_v1.ExecutionsClient()
        workflows_client = workflows_v1.WorkflowsClient()
    
        # Construct the fully qualified location path.
        parent = workflows_client.workflow_path(project, location, workflow)
    
        # Execute the workflow.
        response = execution_client.create_execution(request={"parent": parent})
        print(f"Created execution: {response.name}")
    
        # Wait for execution to finish, then print results.
        execution_finished = False
        backoff_delay = 1  # Start wait with delay of 1 second
        print("Poll for result...")
        while not execution_finished:
            execution = execution_client.get_execution(
                request={"name": response.name}
            )
            execution_finished = execution.state != executions.Execution.State.ACTIVE
    
            # If we haven't seen the result yet, wait a second.
            if not execution_finished:
                print("- Waiting for results...")
                time.sleep(backoff_delay)
                # Double the delay to provide exponential backoff.
                backoff_delay *= 2
            else:
                print(f"Execution finished with state: {execution.state.name}")
                print(f"Execution results: {execution.result}")
                return execution
    
    
    if __name__ == "__main__":
        assert PROJECT, "'GOOGLE_CLOUD_PROJECT' environment variable not set."
        execute_workflow(PROJECT, LOCATION, WORKFLOW_ID)

サンプルコードを実行する

サンプルコードを実行してワークフローを実行できます。ワークフローを実行すると、ワークフローに関連付けられたデプロイされたワークフロー定義が実行されます。

  1. サンプルを実行するには、まず依存関係をインストールします。

    Java

    mvn compile

    Node.js

    npm install -D tsx

    Python

    pip3 install -r requirements.txt

  2. スクリプトを実行します。

    Java

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME mvn compile exec:java -Dexec.mainClass=com.example.workflows.WorkflowsQuickstart

    Node.js

    npx tsx index.js

    Python

    GOOGLE_CLOUD_PROJECT=PROJECT_ID LOCATION=CLOUD_REGION WORKFLOW=WORKFLOW_NAME python3 main.py

    次のように置き換えます。

    • PROJECT_ID: プロジェクト名 Google Cloud
    • CLOUD_REGION: ワークフローのロケーション(デフォルト: us-central1
    • WORKFLOW_NAME: ワークフロー名(デフォルト: myFirstWorkflow

    出力は次のようになります。

    Execution finished with state: SUCCEEDED
    Execution results: ["Thursday","Thursday Night Football","Thursday (band)","Thursday Island","Thursday (album)","Thursday Next","Thursday at the Square","Thursday's Child (David Bowie song)","Thursday Afternoon","Thursday (film)"]
    

実行リクエストでのデータ渡し

クライアント ライブラリの言語によっては、実行リクエストでランタイム引数を渡すこともできます。次に例を示します。

Java

// Creates the execution object.
CreateExecutionRequest request =
    CreateExecutionRequest.newBuilder()
        .setParent(parent.toString())
        .setExecution(Execution.newBuilder().setArgument("{\"searchTerm\":\"Friday\"}").build())
        .build();

Node.js

// Execute workflow
try {
  const createExecutionRes = await client.createExecution({
    parent: client.workflowPath(projectId, location, workflow),
    execution: {
      argument: JSON.stringify({"searchTerm": "Friday"})
    }
});
const executionName = createExecutionRes[0].name;

ランタイム引数を渡す方法について詳しくは、実行リクエストでのランタイム引数渡しをご覧ください。

クリーンアップ

このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。

  1. 作成したワークフローを削除します。

    gcloud workflows delete myFirstWorkflow
    
  2. 続行を確認するメッセージが表示されたら、「y」と入力します。

ワークフローが削除されます。

次のステップ