Pub/Sub 상태 알림을 보내는 작업 만들기 및 실행

이 문서에서는 Pub/Sub 알림을 보내는 Batch 작업을 만드는 방법을 설명합니다. Pub/Sub를 사용하면 작업 또는 태스크 상태가 변경되거나 작업 또는 태스크가 특정 상태로 전환되면 알림을 받을 수 있습니다. 자세한 내용은 알림을 사용하여 작업 모니터링을 참조하세요.

시작하기 전에

  1. Batch를 사용한 적이 없으면 Batch 시작하기를 검토하고 프로젝트 및 사용자 기본 요건을 완료하여 Batch를 사용 설정하세요.
  2. Batch 알림에 대한 Pub/Sub 주제를 만들거나 식별합니다.
  3. 알림을 수신하고 사용하도록 구독을 구성합니다.

필요한 역할

  • 알림을 보내는 작업을 만들고 실행하는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.

    역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

    커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

  • 작업의 서비스 계정에 대한 기본 구성을 사용하지 않는 경우 필요한 권한이 있는지 확인합니다.

    작업의 서비스 계정에 Pub/Sub 알림을 게시하는 데 필요한 권한을 확보하려면 관리자에게 작업의 서비스 계정에 Pub/Sub 주제에 대한 Pub/Sub 게시자(roles/pubsub.publisher) IAM 역할을 부여해 달라고 요청하세요.

  • 작업에서 작업과 다른 프로젝트에 있는 Pub/Sub 주제에 대한 알림을 게시하도록 하려면 작업의 프로젝트에 대한 Batch 서비스 에이전트에 해당 주제에 게시할 수 있는 권한을 부여해야 합니다.

    작업의 프로젝트에 대한 Batch 서비스 에이전트에서 다른 프로젝트에서 Pub/Sub 주제에 대한 Pub/Sub 알림을 게시하는 데 필요한 권한을 확보하려면 관리자에게 작업의 프로젝트에 대한 Batch 서비스 에이전트에 Pub/Sub 주제에 대한 Pub/Sub 게시자(roles/pubsub.publisher) IAM 역할을 부여해 달라고 요청하세요.

알림을 보내는 작업 만들기 및 실행

다음을 수행하여 Pub/Sub 알림을 보내는 Batch 작업을 만들 수 있습니다.

gcloud

Google Cloud CLI를 사용하여 JSON 파일의 본문에 notifications 필드 및 하나 이상의 jobNotification 객체를 포함하는 작업을 만듭니다.

{
...
  "notifications": [
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        ATTRIBUTES
      }
    }
  ]
...
}

다음을 바꿉니다.

  • PROJECT_ID: Pub/Sub 주제가 포함된 프로젝트의 프로젝트 ID
  • TOPIC_ID: Pub/Sub 알림을 사용 설정할 때 만든 주제의 Pub/Sub 주제 ID
  • ATTRIBUTES: 다음 속성 중 하나 이상을 지정합니다. 각 속성을 사용하면 작업 또는 모든 태스크의 상태에 대한 알림을 받을 수 있습니다.

    • 모든 작업 상태 변경에 대한 알림을 받으려면 다음을 지정합니다.

      "type": "JOB_STATE_CHANGED"
      
    • 특정 작업 상태 변경에 대한 알림을 받으려면 다음을 지정합니다.

      "type": "JOB_STATE_CHANGED",
      "newJobState": "JOB_STATE"
      

      JOB_STATE를 다음 작업 상태 중 하나로 바꿉니다.

      • QUEUED
      • SCHEDULED
      • RUNNING
      • SUCCEEDED
      • FAILED

      작업 상태에 대한 자세한 내용은 작업 수명 주기를 참조하세요.

    • 모든 태스크 상태 변경에 대한 알림을 받으려면 다음을 지정합니다.

      "type": "TASK_STATE_CHANGED"
      
    • 특정 태스크 상태 변경에 대한 알림을 받으려면 다음을 지정합니다.

      "type": "TASK_STATE_CHANGED",
      "newTaskState": "TASK_STATE"
      

      TASK_STATE를 다음 태스크 상태 중 하나로 바꿉니다.

      • PENDING
      • ASSIGNED
      • RUNNING
      • SUCCEEDED
      • FAILED

      태스크 상태에 대한 자세한 내용은 작업 수명 주기를 참조하세요.

예를 들어 모든 작업 상태 변경과 작업 실패에 대한 알림을 받으려고 합니다. 이를 위해 다음과 유사한 JSON 구성 파일이 있을 수 있습니다.

{
  "taskGroups": [
    {
      "taskSpec": {
        "runnables": [
          {
            "script": {
              "text": "echo Hello World! This is task $BATCH_TASK_INDEX."
            }
          }
        ]
      },
      "taskCount": 3,
    }
  ],
  "logsPolicy": {
      "destination": "CLOUD_LOGGING"
  },
  "notifications": [
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        "type": "JOB_STATE_CHANGED"
      }
    },
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        "type": "TASK_STATE_CHANGED",
        "newTaskState": "FAILED"
      }
    }
  ]
}

API

REST API를 사용하여 JSON 파일의 본문에 notifications 필드 및 하나 이상의 jobNotification 객체를 포함하는 작업을 만듭니다.

{
...
  "notifications": [
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        ATTRIBUTES
      }
    }
  ]
...
}

다음을 바꿉니다.

  • PROJECT_ID: Pub/Sub 주제가 포함된 프로젝트의 프로젝트 ID
  • TOPIC_ID: Pub/Sub 알림을 사용 설정할 때 만든 주제의 Pub/Sub 주제 ID
  • ATTRIBUTES: 다음 속성 중 하나 이상을 지정합니다. 각 속성을 사용하면 작업 또는 모든 태스크의 상태에 대한 알림을 받을 수 있습니다.

    • 모든 작업 상태 변경에 대한 알림을 받으려면 다음을 지정합니다.

      "type": "JOB_STATE_CHANGED"
      
    • 특정 작업 상태 변경에 대한 알림을 받으려면 다음을 지정합니다.

      "type": "JOB_STATE_CHANGED",
      "newJobState": "JOB_STATE"
      

      JOB_STATE를 다음 작업 상태 중 하나로 바꿉니다.

      • QUEUED
      • SCHEDULED
      • RUNNING
      • SUCCEEDED
      • FAILED

      작업 상태에 대한 자세한 내용은 작업 수명 주기를 참조하세요.

    • 모든 태스크 상태 변경에 대한 알림을 받으려면 다음을 지정합니다.

      "type": "TASK_STATE_CHANGED"
      
    • 특정 태스크 상태 변경에 대한 알림을 받으려면 다음을 지정합니다.

      "type": "TASK_STATE_CHANGED",
      "newTaskState": "TASK_STATE"
      

      TASK_STATE를 다음 태스크 상태 중 하나로 바꿉니다.

      • PENDING
      • ASSIGNED
      • RUNNING
      • SUCCEEDED
      • FAILED

      태스크 상태에 대한 자세한 내용은 작업 수명 주기를 참조하세요.

예를 들어 모든 작업 상태 변경과 작업 실패에 대한 알림을 받으려고 합니다. 이를 위해 다음과 유사한 JSON 구성 파일이 있을 수 있습니다.

{
  "taskGroups": [
    {
      "taskSpec": {
        "runnables": [
          {
            "script": {
              "text": "echo Hello World! This is task $BATCH_TASK_INDEX."
            }
          }
        ]
      },
      "taskCount": 3,
    }
  ],
  "logsPolicy": {
      "destination": "CLOUD_LOGGING"
  },
  "notifications": [
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        "type": "JOB_STATE_CHANGED"
      }
    },
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        "type": "TASK_STATE_CHANGED",
        "newTaskState": "FAILED"
      }
    }
  ]
}

Go

import (
	"context"
	"fmt"
	"io"

	batch "cloud.google.com/go/batch/apiv1"
	"cloud.google.com/go/batch/apiv1/batchpb"
	durationpb "google.golang.org/protobuf/types/known/durationpb"
)

// Creates and runs a job with configured notifications
func createJobWithNotifications(w io.Writer, projectID, region, jobName, topicName string) (*batchpb.Job, error) {

	ctx := context.Background()
	batchClient, err := batch.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("batchClient error: %w", err)
	}
	defer batchClient.Close()

	script := &batchpb.Runnable_Script_{
		Script: &batchpb.Runnable_Script{
			Command: &batchpb.Runnable_Script_Text{
				Text: "echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks.",
			},
		},
	}

	taskSpec := &batchpb.TaskSpec{
		ComputeResource: &batchpb.ComputeResource{
			// CpuMilli is milliseconds per cpu-second. This means the task requires 2 whole CPUs.
			CpuMilli:  2000,
			MemoryMib: 16,
		},
		MaxRunDuration: &durationpb.Duration{
			Seconds: 3600,
		},
		MaxRetryCount: 2,
		Runnables: []*batchpb.Runnable{{
			Executable: script,
		}},
	}

	taskGroups := []*batchpb.TaskGroup{
		{
			TaskCount: 4,
			TaskSpec:  taskSpec,
		},
	}

	labels := map[string]string{"env": "testing", "type": "container"}

	// Policies are used to define on what kind of virtual machines the tasks will run on.
	// In this case, we tell the system to use "e2-standard-4" machine type.
	// Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
	allocationPolicy := &batchpb.AllocationPolicy{
		Instances: []*batchpb.AllocationPolicy_InstancePolicyOrTemplate{{
			PolicyTemplate: &batchpb.AllocationPolicy_InstancePolicyOrTemplate_Policy{
				Policy: &batchpb.AllocationPolicy_InstancePolicy{
					MachineType: "e2-standard-4",
				},
			},
		}},
	}

	// We use Cloud Logging as it's an out of the box available option
	logsPolicy := &batchpb.LogsPolicy{
		Destination: batchpb.LogsPolicy_CLOUD_LOGGING,
	}

	notifications := []*batchpb.JobNotification{
		{
			PubsubTopic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicName),
			Message: &batchpb.JobNotification_Message{
				Type: batchpb.JobNotification_JOB_STATE_CHANGED,
			},
		},
		{
			PubsubTopic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicName),
			Message: &batchpb.JobNotification_Message{
				Type:         batchpb.JobNotification_TASK_STATE_CHANGED,
				NewTaskState: batchpb.TaskStatus_FAILED,
			},
		},
	}

	job := &batchpb.Job{
		Name:             jobName,
		TaskGroups:       taskGroups,
		AllocationPolicy: allocationPolicy,
		Labels:           labels,
		Notifications:    notifications,
		LogsPolicy:       logsPolicy,
	}

	request := &batchpb.CreateJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, region),
		JobId:  jobName,
		Job:    job,
	}

	created_job, err := batchClient.CreateJob(ctx, request)
	if err != nil {
		return nil, fmt.Errorf("unable to create job: %w", err)
	}

	fmt.Fprintf(w, "Job created: %v\n", created_job)
	return created_job, nil
}

자바


import com.google.cloud.batch.v1.BatchServiceClient;
import com.google.cloud.batch.v1.CreateJobRequest;
import com.google.cloud.batch.v1.Job;
import com.google.cloud.batch.v1.JobNotification;
import com.google.cloud.batch.v1.JobNotification.Message;
import com.google.cloud.batch.v1.JobNotification.Type;
import com.google.cloud.batch.v1.LogsPolicy;
import com.google.cloud.batch.v1.LogsPolicy.Destination;
import com.google.cloud.batch.v1.Runnable;
import com.google.cloud.batch.v1.Runnable.Script;
import com.google.cloud.batch.v1.TaskGroup;
import com.google.cloud.batch.v1.TaskSpec;
import com.google.cloud.batch.v1.TaskStatus.State;
import com.google.common.collect.Lists;
import com.google.protobuf.Duration;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CreateBatchNotification {

  public static void main(String[] args)
      throws IOException, ExecutionException, InterruptedException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    // Project ID or project number of the Google Cloud project you want to use.
    String projectId = "YOUR_PROJECT_ID";
    // Name of the region you want to use to run the job. Regions that are
    // available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
    String region = "europe-central2";
    // The name of the job that will be created.
    // It needs to be unique for each project and region pair.
    String jobName = "JOB_NAME";
    // The Pub/Sub topic ID to send the notifications to.
    String topicId = "TOPIC_ID";

    createBatchNotification(projectId, region, jobName, topicId);
  }

  // Create a Batch job that sends notifications to Pub/Sub
  public static Job createBatchNotification(String projectId, String region, String jobName,
                                            String topicId)
      throws IOException, ExecutionException, InterruptedException, TimeoutException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (BatchServiceClient batchServiceClient = BatchServiceClient.create()) {
      // Define what will be done as part of the job.
      Runnable runnable =
          Runnable.newBuilder()
              .setScript(
                  Script.newBuilder()
                      .setText(
                          "echo Hello world! This is task ${BATCH_TASK_INDEX}. "
                                  + "This job has a total of ${BATCH_TASK_COUNT} tasks.")
                      // You can also run a script from a file. Just remember, that needs to be a
                      // script that's already on the VM that will be running the job.
                      // Using setText() and setPath() is mutually exclusive.
                      // .setPath("/tmp/test.sh")
                      .build())
              .build();

      TaskSpec task = TaskSpec.newBuilder()
              // Jobs can be divided into tasks. In this case, we have only one task.
              .addRunnables(runnable)
              .setMaxRetryCount(2)
              .setMaxRunDuration(Duration.newBuilder().setSeconds(3600).build())
              .build();

      // Tasks are grouped inside a job using TaskGroups.
      // Currently, it's possible to have only one task group.
      TaskGroup taskGroup = TaskGroup.newBuilder()
          .setTaskCount(3)
          .setParallelism(1)
          .setTaskSpec(task)
          .build();

      Job job =
          Job.newBuilder()
              .addTaskGroups(taskGroup)
              .addAllNotifications(buildNotifications(projectId, topicId))
              .putLabels("env", "testing")
              .putLabels("type", "script")
              // We use Cloud Logging as it's an out of the box available option.
              .setLogsPolicy(
                  LogsPolicy.newBuilder().setDestination(Destination.CLOUD_LOGGING))
              .build();

      CreateJobRequest createJobRequest =
          CreateJobRequest.newBuilder()
              // The job's parent is the region in which the job will run.
              .setParent(String.format("projects/%s/locations/%s", projectId, region))
              .setJob(job)
              .setJobId(jobName)
              .build();

      Job result =
          batchServiceClient
              .createJobCallable()
              .futureCall(createJobRequest)
              .get(5, TimeUnit.MINUTES);

      System.out.printf("Successfully created the job: %s", result.getName());

      return result;
    }
  }

  // Creates notification configurations to send messages to Pub/Sub when the state is changed
  private static Iterable<JobNotification> buildNotifications(String projectId, String topicId) {
    String pubsubTopic = String.format("projects/%s/topics/%s", projectId, topicId);

    JobNotification jobStateChanged = JobNotification.newBuilder()
            .setPubsubTopic(pubsubTopic)
            .setMessage(Message.newBuilder().setType(Type.JOB_STATE_CHANGED))
            .build();

    JobNotification taskStateChanged = JobNotification.newBuilder()
            .setPubsubTopic(pubsubTopic)
            .setMessage(Message.newBuilder()
                    .setType(Type.TASK_STATE_CHANGED)
                    .setNewTaskState(State.FAILED))
            .build();

    return Lists.newArrayList(jobStateChanged, taskStateChanged);
  }
}

Node.js

// Imports the Batch library
const batchLib = require('@google-cloud/batch');
const batch = batchLib.protos.google.cloud.batch.v1;

// Instantiates a client
const batchClient = new batchLib.v1.BatchServiceClient();

/**
 * TODO(developer): Update these variables before running the sample.
 */
// Project ID or project number of the Google Cloud project you want to use.
const PROJECT_ID = await batchClient.getProjectId();
// Name of the region you want to use to run the job. Regions that are
// available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
const REGION = 'europe-central2';
// The name of the job that will be created.
// It needs to be unique for each project and region pair.
const JOB_NAME = 'job-name-batch-notifications';
// The Pub/Sub topic ID to send the notifications to.
const TOPIC_ID = 'topic-id';

// Define what will be done as part of the job.
const task = new batch.TaskSpec();
const runnable = new batch.Runnable();
runnable.script = new batch.Runnable.Script();
runnable.script.commands = [
  '-c',
  'echo Hello world! This is task ${BATCH_TASK_INDEX}.',
];
task.runnables = [runnable];
task.maxRetryCount = 2;
task.maxRunDuration = {seconds: 3600};

// Tasks are grouped inside a job using TaskGroups.
const group = new batch.TaskGroup();
group.taskCount = 3;
group.taskSpec = task;

// Create batch notification when job state changed
const notification1 = new batch.JobNotification();
notification1.pubsubTopic = `projects/${PROJECT_ID}/topics/${TOPIC_ID}`;
notification1.message = {
  type: 'JOB_STATE_CHANGED',
};

// Create batch notification when task state changed
const notification2 = new batch.JobNotification();
notification2.pubsubTopic = `projects/${PROJECT_ID}/topics/${TOPIC_ID}`;
notification2.message = {
  type: 'TASK_STATE_CHANGED',
  newTaskState: 'FAILED',
};

const job = new batch.Job();
job.name = JOB_NAME;
job.taskGroups = [group];
job.notifications = [notification1, notification2];
job.labels = {env: 'testing', type: 'script'};
// We use Cloud Logging as it's an option available out of the box
job.logsPolicy = new batch.LogsPolicy();
job.logsPolicy.destination = batch.LogsPolicy.Destination.CLOUD_LOGGING;
// The job's parent is the project and region in which the job will run
const parent = `projects/${PROJECT_ID}/locations/${REGION}`;

async function callCreateBatchNotifications() {
  // Construct request
  const request = {
    parent,
    jobId: JOB_NAME,
    job,
  };

  // Run request
  const [response] = await batchClient.createJob(request);
  console.log(JSON.stringify(response));
}

await callCreateBatchNotifications();

Python

from google.cloud import batch_v1


def create_with_pubsub_notification_job(
    project_id: str, region: str, job_name: str, topic_name: str
) -> batch_v1.Job:
    """
    This method shows how to create a sample Batch Job that will run
    a simple command inside a container on Cloud Compute instances.

    Args:
        project_id: project ID or project number of the Cloud project you want to use.
        region: name of the region you want to use to run the job. Regions that are
            available for Batch are listed on: https://cloud.google.com/batch/docs/locations
        job_name: the name of the job that will be created.
            It needs to be unique for each project and region pair.
        topic_name: the name of the Pub/Sub topic to which the notification will be sent.
            The topic should be created in GCP Pub/Sub before running this method.
            The procedure for creating a topic is listed here: https://cloud.google.com/pubsub/docs/create-topic

    Returns:
        A job object representing the job created.
    """

    client = batch_v1.BatchServiceClient()

    # Define what will be done as part of the job.
    runnable = batch_v1.Runnable()
    runnable.container = batch_v1.Runnable.Container()
    runnable.container.image_uri = "gcr.io/google-containers/busybox"
    runnable.container.entrypoint = "/bin/sh"
    runnable.container.commands = [
        "-c",
        "echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks.",
    ]

    # Jobs can be divided into tasks. In this case, we have only one task.
    task = batch_v1.TaskSpec()
    task.runnables = [runnable]

    # We can specify what resources are requested by each task.
    resources = batch_v1.ComputeResource()
    resources.cpu_milli = 2000  # in milliseconds per cpu-second. This means the task requires 2 whole CPUs.
    resources.memory_mib = 16  # in MiB
    task.compute_resource = resources

    task.max_retry_count = 2
    task.max_run_duration = "3600s"

    # Tasks are grouped inside a job using TaskGroups.
    # Currently, it's possible to have only one task group.
    group = batch_v1.TaskGroup()
    group.task_count = 4
    group.task_spec = task

    # Policies are used to define on what kind of virtual machines the tasks will run on.
    # In this case, we tell the system to use "e2-standard-4" machine type.
    # Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
    policy = batch_v1.AllocationPolicy.InstancePolicy()
    policy.machine_type = "e2-standard-4"
    instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
    instances.policy = policy
    allocation_policy = batch_v1.AllocationPolicy()
    allocation_policy.instances = [instances]

    job = batch_v1.Job()
    job.task_groups = [group]
    job.allocation_policy = allocation_policy
    job.labels = {"env": "testing", "type": "container"}
    # We use Cloud Logging as it's an out of the box available option
    job.logs_policy = batch_v1.LogsPolicy()
    job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

    # Configuring the first notification
    notification1 = batch_v1.JobNotification()
    notification1.pubsub_topic = f"projects/{project_id}/topics/{topic_name}"
    # Define the message that will be sent to the topic
    first_massage = batch_v1.JobNotification.Message()
    # Specify the new job state that will trigger the notification
    # In this case, the notification is triggered when the job state changes to SUCCEEDED
    first_massage.type_ = batch_v1.JobNotification.Type.JOB_STATE_CHANGED
    first_massage.new_job_state = batch_v1.JobStatus.State.SUCCEEDED
    # Assign the message to the notification
    notification1.message = first_massage

    # Configuring the second notification
    notification2 = batch_v1.JobNotification()
    notification2.pubsub_topic = f"projects/{project_id}/topics/{topic_name}"
    second_message = batch_v1.JobNotification.Message()
    second_message.type_ = batch_v1.JobNotification.Type.TASK_STATE_CHANGED
    second_message.new_task_state = batch_v1.TaskStatus.State.FAILED
    notification2.message = second_message

    # Assign a list of notifications to the job.
    job.notifications = [notification1, notification2]

    create_request = batch_v1.CreateJobRequest()
    create_request.job = job
    create_request.job_id = job_name
    # The job's parent is the region in which the job will run
    create_request.parent = f"projects/{project_id}/locations/{region}"
    return client.create_job(create_request)

작업 실행이 시작된 후 해당 알림을 사용할 수 있습니다. 예를 들어 작업의 Pub/Sub 주제에 알림을 BigQuery로 스트리밍하는 구독이 있는 경우 BigQuery에서 Pub/Sub 알림을 분석할 수 있습니다.

다음 단계