配置自定义状态事件以描述可运行项

本文档介绍了如何在创建和运行批处理作业时配置自定义状态事件(用于描述作业的可运行作业)。如需了解状态事件,请参阅通过状态事件查看作业的历史记录

借助自定义状态事件,您可以在任务历史记录中提供有关其可运行项进度的更多详细信息,这有助于更轻松地分析和排查作业问题。例如,您可以配置自定义状态事件,用于描述可运行对象的开始时间、结束时间、达到障碍可运行对象的时间,或代码运行期间发生的重要事件。

准备工作

  1. 如果您之前未使用过批处理功能,请参阅开始使用批处理,并完成适用于项目和用户的前提条件,以启用批处理功能。
  2. 如需获得创建作业所需的权限,请让您的管理员为您授予以下 IAM 角色:

    如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

    您也可以通过自定义角色或其他预定义角色来获取所需的权限。

配置自定义状态事件

创建作业时,使用以下一项或多项选项配置自定义状态事件:

  • 通过定义可运行项的显示名称来描述可运行项的状态。您可以在使用 gcloud CLI 或批处理 API 创建作业时执行此操作。

  • 通过为每个事件编写包含 batch/custom/event 字段的有结构的任务日志,指明重要的运行时事件。您可以在使用任何方法创建作业时,在脚本和容器可运行项的定义中执行此操作。

描述可运行项的状态

您可以通过定义可运行项的显示名称(displayName 字段)来配置用于描述可运行项状态的自定义状态事件。生成的自定义状态事件因可运行项的类型而略有不同:

  • 如果您为容器可运行作业或脚本可运行作业定义了显示名称,则批处理作业会自动添加两种类型的自定义状态事件。第一个自定义状态事件会指示任务何时启动此可运行对象。第二个自定义状态事件会指示任务何时完成此可运行对象以及相应的退出代码

  • 如果您为屏障可运行项定义了显示名称,则批处理会自动添加一个自定义状态事件,用于指示任务何时到达此屏障。

如需创建并运行包含描述可运行作业的状态的自定义状态事件的作业,请使用 gcloud CLI、Batch API 或库为一个或多个可运行作业定义 displayName 字段。

gcloud

使用 Google Cloud CLI 创建作业,在 JSON 文件中的一个或多个 runnables 定义中添加 displayName 字段:

...
"runnables": [
  {
    "displayName":DISPLAY_NAME,
    ...
  }
]
...

例如,如果作业包含用于描述每个可运行作业的状态的自定义状态事件,则其 JSON 配置文件可能如下所示:

{
  "taskGroups": [
    {
      "taskSpec": {
        "runnables": [
          {
            "displayName":"DISPLAY_NAME1",
            "script": {
              "text": "echo Hello world from script 1 for task ${BATCH_TASK_INDEX}"
            }
          },
          {
            "displayName":"DISPLAY_NAME2",
            "barrier": {}
          },
          {
            "displayName":"DISPLAY_NAME3",
            "script": {
              "text": "echo Hello world from script 2 for task ${BATCH_TASK_INDEX}"
            }
          }
        ]
      },
      "taskCount": 3
    }
  ],
  "logsPolicy": {
    "destination": "CLOUD_LOGGING"
  }
}

DISPLAY_NAME1DISPLAY_NAME2DISPLAY_NAME3 替换为可运行项的名称,该名称必须在作业中保持唯一性,例如 script 1barrier 1script 2

API

使用 REST API 创建作业,使其在 JSON 文件中的一个或多个 runnables 定义中包含 displayName 字段:

...
"runnables": [
  {
    "displayName":DISPLAY_NAME,
    ...
  }
]
...

例如,如果作业包含用于描述每个可运行作业的状态的自定义状态事件,则其 JSON 配置文件可能如下所示:

{
  "taskGroups": [
    {
      "taskSpec": {
        "runnables": [
          {
            "displayName":"DISPLAY_NAME1",
            "script": {
              "text": "echo Hello world from script 1 for task ${BATCH_TASK_INDEX}"
            }
          },
          {
            "displayName":"DISPLAY_NAME2",
            "barrier": {}
          },
          {
            "displayName":"DISPLAY_NAME3",
            "script": {
              "text": "echo Hello world from script 2 for task ${BATCH_TASK_INDEX}"
            }
          }
        ]
      },
      "taskCount": 3
    }
  ],
  "logsPolicy": {
    "destination": "CLOUD_LOGGING"
  }
}

DISPLAY_NAME1DISPLAY_NAME2DISPLAY_NAME3 替换为可运行项的名称,该名称必须在作业中保持唯一性,例如 script 1barrier 1script 2

Go

import (
	"context"
	"fmt"
	"io"

	batch "cloud.google.com/go/batch/apiv1"
	"cloud.google.com/go/batch/apiv1/batchpb"
	durationpb "google.golang.org/protobuf/types/known/durationpb"
)

// Creates and runs a job with custom events
func createJobWithCustomEvents(w io.Writer, projectID, jobName string) (*batchpb.Job, error) {
	region := "us-central1"
	displayName1 := "script 1"
	displayName2 := "barrier 1"
	displayName3 := "script 2"

	ctx := context.Background()
	batchClient, err := batch.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("batchClient error: %w", err)
	}
	defer batchClient.Close()

	runn1 := &batchpb.Runnable{
		Executable: &batchpb.Runnable_Script_{
			Script: &batchpb.Runnable_Script{
				Command: &batchpb.Runnable_Script_Text{
					Text: "echo Hello world from script 1 for task ${BATCH_TASK_INDEX}",
				},
			},
		},
		DisplayName: displayName1,
	}

	runn2 := &batchpb.Runnable{
		Executable: &batchpb.Runnable_Barrier_{
			Barrier: &batchpb.Runnable_Barrier{},
		},
		DisplayName: displayName2,
	}

	runn3 := &batchpb.Runnable{
		Executable: &batchpb.Runnable_Script_{
			Script: &batchpb.Runnable_Script{
				Command: &batchpb.Runnable_Script_Text{
					Text: "echo Hello world from script 2 for task ${BATCH_TASK_INDEX}",
				},
			},
		},
		DisplayName: displayName3,
	}

	runn4 := &batchpb.Runnable{
		Executable: &batchpb.Runnable_Script_{
			Script: &batchpb.Runnable_Script{
				Command: &batchpb.Runnable_Script_Text{
					Text: "sleep 30; echo '{\"batch/custom/event\": \"DESCRIPTION\"}'; sleep 30",
				},
			},
		},
	}

	taskSpec := &batchpb.TaskSpec{
		ComputeResource: &batchpb.ComputeResource{
			// CpuMilli is milliseconds per cpu-second. This means the task requires 2 whole CPUs.
			CpuMilli:  2000,
			MemoryMib: 16,
		},
		MaxRunDuration: &durationpb.Duration{
			Seconds: 3600,
		},
		MaxRetryCount: 2,
		Runnables:     []*batchpb.Runnable{runn1, runn2, runn3, runn4},
	}

	taskGroups := []*batchpb.TaskGroup{
		{
			TaskCount: 4,
			TaskSpec:  taskSpec,
		},
	}

	labels := map[string]string{"env": "testing", "type": "container"}

	// Policies are used to define on what kind of virtual machines the tasks will run on.
	// In this case, we tell the system to use "e2-standard-4" machine type.
	// Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
	allocationPolicy := &batchpb.AllocationPolicy{
		Instances: []*batchpb.AllocationPolicy_InstancePolicyOrTemplate{{
			PolicyTemplate: &batchpb.AllocationPolicy_InstancePolicyOrTemplate_Policy{
				Policy: &batchpb.AllocationPolicy_InstancePolicy{
					MachineType: "e2-standard-4",
				},
			},
		}},
	}

	// We use Cloud Logging as it's an out of the box available option
	logsPolicy := &batchpb.LogsPolicy{
		Destination: batchpb.LogsPolicy_CLOUD_LOGGING,
	}

	job := &batchpb.Job{
		Name:             jobName,
		TaskGroups:       taskGroups,
		AllocationPolicy: allocationPolicy,
		Labels:           labels,
		LogsPolicy:       logsPolicy,
	}

	request := &batchpb.CreateJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, region),
		JobId:  jobName,
		Job:    job,
	}

	created_job, err := batchClient.CreateJob(ctx, request)
	if err != nil {
		return nil, fmt.Errorf("unable to create job: %w", err)
	}

	fmt.Fprintf(w, "Job created: %v\n", created_job)
	return created_job, nil
}

Java


import com.google.cloud.batch.v1.BatchServiceClient;
import com.google.cloud.batch.v1.CreateJobRequest;
import com.google.cloud.batch.v1.Job;
import com.google.cloud.batch.v1.LogsPolicy;
import com.google.cloud.batch.v1.LogsPolicy.Destination;
import com.google.cloud.batch.v1.Runnable;
import com.google.cloud.batch.v1.Runnable.Barrier;
import com.google.cloud.batch.v1.Runnable.Script;
import com.google.cloud.batch.v1.TaskGroup;
import com.google.cloud.batch.v1.TaskSpec;
import com.google.protobuf.Duration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CreateBatchCustomEvent {

  public static void main(String[] args)
      throws IOException, ExecutionException, InterruptedException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    // Project ID or project number of the Google Cloud project you want to use.
    String projectId = "YOUR_PROJECT_ID";
    // Name of the region you want to use to run the job. Regions that are
    // available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
    String region = "europe-central2";
    // The name of the job that will be created.
    // It needs to be unique for each project and region pair.
    String jobName = "JOB_NAME";
    // Name of the runnable, which must be unique
    // within the job. For example: script 1, barrier 1, and script 2.
    String displayName1 = "script 1";
    String displayName2 = "barrier 1";
    String displayName3 = "script 2";

    createBatchCustomEvent(projectId, region, jobName, displayName1, displayName2, displayName3);
  }

  // Configure custom status events, which describe a job's runnables,
  // when you create and run a Batch job.
  public static Job createBatchCustomEvent(String projectId, String region, String jobName,
                                           String displayName1, String displayName2,
                                           String displayName3)
      throws IOException, ExecutionException, InterruptedException, TimeoutException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (BatchServiceClient batchServiceClient = BatchServiceClient.create()) {
      TaskSpec task = TaskSpec.newBuilder()
              // Jobs can be divided into tasks. In this case, we have only one task.
              .addAllRunnables(buildRunnables(displayName1, displayName2, displayName3))
              .setMaxRetryCount(2)
              .setMaxRunDuration(Duration.newBuilder().setSeconds(3600).build())
              .build();

      // Tasks are grouped inside a job using TaskGroups.
      // Currently, it's possible to have only one task group.
      TaskGroup taskGroup = TaskGroup.newBuilder()
          .setTaskCount(3)
          .setParallelism(3)
          .setTaskSpec(task)
          .build();

      Job job =
          Job.newBuilder()
              .addTaskGroups(taskGroup)
              .putLabels("env", "testing")
              .putLabels("type", "script")
              // We use Cloud Logging as it's an out of the box available option.
              .setLogsPolicy(
                  LogsPolicy.newBuilder().setDestination(Destination.CLOUD_LOGGING))
              .build();

      CreateJobRequest createJobRequest =
          CreateJobRequest.newBuilder()
              // The job's parent is the region in which the job will run.
              .setParent(String.format("projects/%s/locations/%s", projectId, region))
              .setJob(job)
              .setJobId(jobName)
              .build();

      Job result =
          batchServiceClient
              .createJobCallable()
              .futureCall(createJobRequest)
              .get(5, TimeUnit.MINUTES);

      System.out.printf("Successfully created the job: %s", result.getName());

      return result;
    }
  }

  // Create runnables with custom scripts
  private static Iterable<Runnable> buildRunnables(String displayName1, String displayName2,
                                                   String displayName3) {
    List<Runnable> runnables = new ArrayList<>();

    // Define what will be done as part of the job.
    runnables.add(Runnable.newBuilder()
        .setDisplayName(displayName1)
        .setScript(
            Script.newBuilder()
                .setText(
                    "echo Hello world from script 1 for task ${BATCH_TASK_INDEX}")
                // You can also run a script from a file. Just remember, that needs to be a
                // script that's already on the VM that will be running the job.
                // Using setText() and setPath() is mutually exclusive.
                // .setPath("/tmp/test.sh")
                )
        .build());

    runnables.add(Runnable.newBuilder()
            .setDisplayName(displayName2)
            .setBarrier(Barrier.newBuilder())
            .build());

    runnables.add(Runnable.newBuilder()
        .setDisplayName(displayName3)
        .setScript(
            Script.newBuilder()
                .setText("echo Hello world from script 2 for task ${BATCH_TASK_INDEX}"))
        .build());

    runnables.add(Runnable.newBuilder()
        .setScript(
            Script.newBuilder()
                // Replace DESCRIPTION with a description
                // for the custom status event—for example, halfway done.
                .setText("sleep 30; echo '{\"batch/custom/event\": \"DESCRIPTION\"}'; sleep 30"))
        .build());

    return runnables;
  }
}

Node.js

// Imports the Batch library
const batchLib = require('@google-cloud/batch');
const batch = batchLib.protos.google.cloud.batch.v1;

// Instantiates a client
const batchClient = new batchLib.v1.BatchServiceClient();

/**
 * TODO(developer): Update these variables before running the sample.
 */
// Project ID or project number of the Google Cloud project you want to use.
const projectId = await batchClient.getProjectId();
// Name of the region you want to use to run the job. Regions that are
// available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
const region = 'europe-central2';
// The name of the job that will be created.
// It needs to be unique for each project and region pair.
const jobName = 'batch-custom-events-job';
// Name of the runnable, which must be unique
// within the job. For example: script 1, barrier 1, and script 2.
const displayName1 = 'script 1';
const displayName2 = 'barrier 1';
const displayName3 = 'script 2';

// Create runnables with custom scripts
const runnable1 = new batch.Runnable({
  displayName: displayName1,
  script: new batch.Runnable.Script({
    commands: [
      '-c',
      'echo Hello world from script 1 for task ${BATCH_TASK_INDEX}.',
    ],
  }),
});

const runnable2 = new batch.Runnable({
  displayName: displayName2,
  barrier: new batch.Runnable.Barrier(),
});

const runnable3 = new batch.Runnable({
  displayName: displayName3,
  script: new batch.Runnable.Script({
    // Replace DESCRIPTION with a description
    // for the custom status event—for example, halfway done.
    commands: [
      'sleep 30; echo \'{"batch/custom/event": "DESCRIPTION"}\'; sleep 30',
    ],
  }),
});

const task = new batch.TaskSpec({
  runnables: [runnable1, runnable2, runnable3],
  maxRetryCount: 2,
  maxRunDuration: {seconds: 3600},
});

// Tasks are grouped inside a job using TaskGroups.
const group = new batch.TaskGroup({
  taskCount: 3,
  taskSpec: task,
});

const job = new batch.Job({
  name: jobName,
  taskGroups: [group],
  labels: {env: 'testing', type: 'script'},
  // We use Cloud Logging as it's an option available out of the box
  logsPolicy: new batch.LogsPolicy({
    destination: batch.LogsPolicy.Destination.CLOUD_LOGGING,
  }),
});
// The job's parent is the project and region in which the job will run
const parent = `projects/${projectId}/locations/${region}`;

async function callCreateBatchCustomEvents() {
  // Construct request
  const request = {
    parent,
    jobId: jobName,
    job,
  };

  // Run request
  const [response] = await batchClient.createJob(request);
  console.log(JSON.stringify(response));
}

await callCreateBatchCustomEvents();

Python

from google.cloud import batch_v1


def create_job_with_status_events(
    project_id: str, region: str, job_name: str
) -> batch_v1.Job:
    """
    This method shows the creation of a Batch job with custom status events which describe runnables
    Within the method, the state of a runnable is described by defining its display name.
    The script text is modified to change the commands that are executed, and barriers are adjusted
    to synchronize tasks at specific points.

    Args:
        project_id (str): project ID or project number of the Cloud project you want to use.
        region (str): name of the region you want to use to run the job. Regions that are
            available for Batch are listed on: https://cloud.google.com/batch/docs/locations
        job_name (str): the name of the job that will be created.
            It needs to be unique for each project and region pair.

    Returns:
        A job object representing the job created with additional runnables and custom events.
    """
    client = batch_v1.BatchServiceClient()

    # Executes a simple script that prints a message.
    runn1 = batch_v1.Runnable()
    runn1.display_name = "Script 1"
    runn1.script.text = "echo Hello world from Script 1 for task ${BATCH_TASK_INDEX}"

    # Acts as a barrier to synchronize the execution of subsequent runnables.
    runn2 = batch_v1.Runnable()
    runn2.display_name = "Barrier 1"
    runn2.barrier = batch_v1.Runnable.Barrier({"name": "hello-barrier"})

    # Executes another script that prints a message, intended to run after the barrier.
    runn3 = batch_v1.Runnable()
    runn3.display_name = "Script 2"
    runn3.script.text = "echo Hello world from Script 2 for task ${BATCH_TASK_INDEX}"

    # Executes a script that imitates a delay and creates a custom event for monitoring purposes.
    runn4 = batch_v1.Runnable()
    runn4.script.text = (
        'sleep 30; echo \'{"batch/custom/event": "EVENT_DESCRIPTION"}\'; sleep 30'
    )

    # Jobs can be divided into tasks. In this case, we have only one task.
    task = batch_v1.TaskSpec()
    # Assigning a list of runnables to the task.
    task.runnables = [runn1, runn2, runn3, runn4]

    # We can specify what resources are requested by each task.
    resources = batch_v1.ComputeResource()
    resources.cpu_milli = 2000  # in milliseconds per cpu-second. This means the task requires 2 whole CPUs.
    resources.memory_mib = 16  # in MiB
    task.compute_resource = resources

    task.max_retry_count = 2
    task.max_run_duration = "3600s"

    # Tasks are grouped inside a job using TaskGroups.
    # Currently, it's possible to have only one task group.
    group = batch_v1.TaskGroup()

    group.task_count = 4
    group.task_spec = task

    # Policies are used to define on what kind of virtual machines the tasks will run on.
    # In this case, we tell the system to use "e2-standard-4" machine type.
    # Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
    policy = batch_v1.AllocationPolicy.InstancePolicy()
    policy.machine_type = "e2-standard-4"
    instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
    instances.policy = policy
    allocation_policy = batch_v1.AllocationPolicy()
    allocation_policy.instances = [instances]

    job = batch_v1.Job()
    job.task_groups = [group]
    job.allocation_policy = allocation_policy
    job.labels = {"env": "testing", "type": "container"}
    # We use Cloud Logging as it's an out of the box available option
    job.logs_policy = batch_v1.LogsPolicy()
    job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

    create_request = batch_v1.CreateJobRequest()
    create_request.job = job
    create_request.job_id = job_name
    # The job's parent is the region in which the job will run
    create_request.parent = f"projects/{project_id}/locations/{region}"

    return client.create_job(create_request)

示例作业运行完毕后,每项任务生成的自定义状态事件类似于以下内容:

statusEvents:
  ...
  - description: 'script at index #0 with display name [DISPLAY_NAME1] started.'
    eventTime: '...'
    type: RUNNABLE_EVENT
  - description: 'script at index #0 with display name [DISPLAY_NAME1] finished with exit
      code 0.'
    eventTime: '...'
    type: RUNNABLE_EVENT
  - description: 'barrier at index #2 with display name [DISPLAY_NAME2] reached.'
    eventTime: '...'
    type: BARRIER_REACHED_EVENT
  - description: 'script at index #2 with display name [DISPLAY_NAME3] started.'
    eventTime: '...'
    type: RUNNABLE_EVENT
  - description: 'script at index #2 with display name [DISPLAY_NAME3] finished with exit
      code 0.'
    eventTime: '...'
    type: RUNNABLE_EVENT
  ...

指明重要的运行时事件

您可以配置自定义状态事件,以指明在运行某个可运行项时发生重要事件的时间。具体方法是,将该可运行项配置为写入结构化任务日志,为“批处理”自定义状态事件 (batch/custom/event) 字段定义字符串。

如果容器可运行作业或脚本可运行作业写入定义 batch/custom/event JSON 字段的结构化任务日志,则会在当时生成自定义状态事件。虽然您可以将结构化任务日志配置为包含其他字段,但自定义状态事件仅包含 batch/custom/event 字段的字符串。

如需创建和运行包含自定义状态事件的作业(用于指示重要事件何时发生),请将一个或多个可运行项配置为通过输出 JSON 写入结构化日志,并将 batch/custom/event 字段定义为日志的一部分。

...
"runnables": [
  {
    ...
    "echo '{\"batch/custom/event\":\"EVENT_DESCRIPTION\"}'"
    ...
  }
]
...
"logsPolicy": {
  "destination": "CLOUD_LOGGING"
}
...

例如,如果作业包含用于指示重要事件何时发生的自定义状态事件,则其 JSON 配置文件可能如下所示:

{
  "taskGroups": [
    {
      "taskSpec": {
        "runnables": [
          {
            "script": {
              "text": "sleep 30; echo '{\"batch/custom/event\": \"EVENT_DESCRIPTION\"}'; sleep 30"
            }
          }
        ]
      },
      "taskCount": 3
    }
  ],
  "logsPolicy": {
    "destination": "CLOUD_LOGGING"
  }
}

EVENT_DESCRIPTION 替换为自定义状态事件的说明,例如 halfway done

示例作业运行完毕后,每项任务生成的自定义状态事件类似于以下内容:

statusEvents:
  ...
  - description: EVENT_DESCRIPTION
    eventTime: '...'
    type: RUNNABLE_CUSTOM_EVENT
  ...

后续步骤