Job erstellen und ausführen, der Pub/Sub-Statusbenachrichtigungen sendet

In diesem Dokument wird beschrieben, wie Sie einen Batch-Job erstellen, der Pub/Sub-Benachrichtigungen sendet. Mit Pub/Sub können Sie sich benachrichtigen lassen, wenn sich der Status eines Jobs oder einer Aufgabe ändert oder wenn ein Job oder eine Aufgabe einen bestimmten Status erreicht. Weitere Informationen finden Sie unter Jobs mithilfe von Benachrichtigungen überwachen.

Hinweise

  1. Wenn Sie Batch noch nicht verwendet haben, lesen Sie den Hilfeartikel Batch-Dateien erstellen und ausführen und aktivieren Sie Batch, indem Sie die Voraussetzungen für Projekte und Nutzer erfüllen.
  2. Erstellen oder identifizieren Sie ein Pub/Sub-Thema für Batchbenachrichtigungen.
  3. Konfigurieren Sie ein Abo, um die Benachrichtigungen zu erhalten und zu verwenden.

Erforderliche Rollen

  • Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen und Ausführen eines Jobs benötigen, über den Benachrichtigungen gesendet werden:

    Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

    Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

  • Sofern Sie nicht die Standardkonfiguration für das Dienstkonto des Jobs verwenden, muss es die erforderlichen Berechtigungen haben.

    Damit das Dienstkonto des Jobs die erforderlichen Berechtigungen zum Veröffentlichen von Pub/Sub-Benachrichtigungen hat, bitten Sie Ihren Administrator, dem Dienstkonto des Jobs die IAM-Rolle Pub/Sub-Publisher (roles/pubsub.publisher) für Ihr Pub/Sub-Thema zu erteilen.

  • Wenn ein Job Benachrichtigungen in einem Pub/Sub-Thema veröffentlichen soll, das sich in einem anderen Projekt als dem des Jobs befindet, muss dem Batch-Dienst-Agent für das Projekt des Jobs die Berechtigung zum Veröffentlichen in diesem Thema erteilt werden.

    Damit der Batch-Dienst-Agent für das Projekt des Jobs die erforderlichen Berechtigungen zum Veröffentlichen von Pub/Sub-Benachrichtigungen in einem Pub/Sub-Thema in einem anderen Projekt hat, bitten Sie Ihren Administrator, dem Batch-Dienst-Agent für das Projekt des Jobs die IAM-Rolle Pub/Sub-Publisher (roles/pubsub.publisher) für das Pub/Sub-Thema zuzuweisen.

Job erstellen und ausführen, der Benachrichtigungen sendet

So erstellen Sie einen Batchjob, der Pub/Sub-Benachrichtigungen sendet:

gcloud

Verwenden Sie die Google Cloud CLI, um einen Job zu erstellen, der das Feld notifications und ein oder mehrere jobNotification-Objekte im Hauptteil der JSON-Datei enthält:

{
...
  "notifications": [
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        ATTRIBUTES
      }
    }
  ]
...
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Projekt-ID des Projekts, das das Pub/Sub-Thema enthält.
  • TOPIC_ID: die Pub/Sub-Themen-ID des Themas, das Sie beim Aktivieren von Pub/Sub-Benachrichtigungen erstellt haben.
  • ATTRIBUTES: Geben Sie eines oder mehrere der folgenden Attribute an, mit denen Sie Benachrichtigungen über den Status des Jobs oder aller seiner Aufgaben erhalten.

    • Geben Sie Folgendes an, um Benachrichtigungen zu allen Änderungen des Jobstatus zu erhalten:

      "type": "JOB_STATE_CHANGED"
      
    • Geben Sie Folgendes an, um Benachrichtigungen zu einer bestimmten Änderung des Jobstatus zu erhalten:

      "type": "JOB_STATE_CHANGED",
      "newJobState": "JOB_STATE"
      

      Ersetzen Sie JOB_STATE durch einen der folgenden Jobstatus:

      • QUEUED
      • SCHEDULED
      • RUNNING
      • SUCCEEDED
      • FAILED

      Weitere Informationen zu Jobstatus finden Sie unter Joblebenszyklus.

    • Geben Sie Folgendes an, um Benachrichtigungen zu allen Änderungen des Aufgabenstatus zu erhalten:

      "type": "TASK_STATE_CHANGED"
      
    • Geben Sie für Benachrichtigungen zu bestimmten Änderungen des Aufgabenstatus Folgendes an:

      "type": "TASK_STATE_CHANGED",
      "newTaskState": "TASK_STATE"
      

      Ersetzen Sie TASK_STATE durch einen der folgenden Aufgabenstatus:

      • PENDING
      • ASSIGNED
      • RUNNING
      • SUCCEEDED
      • FAILED

      Weitere Informationen zu Aufgabenstatus finden Sie unter Joblebenszyklus.

Angenommen, Sie möchten Benachrichtigungen zu allen Änderungen des Jobstatus und jedes Mal erhalten, wenn eine Aufgabe fehlschlägt. Dazu können Sie eine JSON-Konfigurationsdatei verwenden, die in etwa so aussieht:

{
  "taskGroups": [
    {
      "taskSpec": {
        "runnables": [
          {
            "script": {
              "text": "echo Hello World! This is task $BATCH_TASK_INDEX."
            }
          }
        ]
      },
      "taskCount": 3,
    }
  ],
  "logsPolicy": {
      "destination": "CLOUD_LOGGING"
  },
  "notifications": [
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        "type": "JOB_STATE_CHANGED"
      }
    },
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        "type": "TASK_STATE_CHANGED",
        "newTaskState": "FAILED"
      }
    }
  ]
}

API

Verwenden Sie die REST API, um einen Job zu erstellen, der das Feld notifications und ein oder mehrere jobNotification-Objekte im Hauptteil der JSON-Datei enthält:

{
...
  "notifications": [
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        ATTRIBUTES
      }
    }
  ]
...
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Projekt-ID des Projekts, das das Pub/Sub-Thema enthält.
  • TOPIC_ID: die Pub/Sub-Thema-ID des Themas, das Sie beim Aktivieren von Pub/Sub-Benachrichtigungen erstellt haben.
  • ATTRIBUTES: Geben Sie eines oder mehrere der folgenden Attribute an, mit denen Sie Benachrichtigungen über den Status des Jobs oder aller seiner Aufgaben erhalten.

    • Geben Sie Folgendes an, um Benachrichtigungen zu allen Änderungen des Jobstatus zu erhalten:

      "type": "JOB_STATE_CHANGED"
      
    • Geben Sie Folgendes an, um Benachrichtigungen zu einer bestimmten Änderung des Jobstatus zu erhalten:

      "type": "JOB_STATE_CHANGED",
      "newJobState": "JOB_STATE"
      

      Ersetzen Sie JOB_STATE durch einen der folgenden Jobstatus:

      • QUEUED
      • SCHEDULED
      • RUNNING
      • SUCCEEDED
      • FAILED

      Weitere Informationen zu Jobstatus finden Sie unter Joblebenszyklus.

    • Geben Sie Folgendes an, um Benachrichtigungen zu allen Änderungen des Aufgabenstatus zu erhalten:

      "type": "TASK_STATE_CHANGED"
      
    • Geben Sie für Benachrichtigungen zu bestimmten Änderungen des Aufgabenstatus Folgendes an:

      "type": "TASK_STATE_CHANGED",
      "newTaskState": "TASK_STATE"
      

      Ersetzen Sie TASK_STATE durch einen der folgenden Aufgabenstatus:

      • PENDING
      • ASSIGNED
      • RUNNING
      • SUCCEEDED
      • FAILED

      Weitere Informationen zu Aufgabenstatus finden Sie unter Joblebenszyklus.

Angenommen, Sie möchten Benachrichtigungen zu allen Änderungen des Jobstatus und jedes Mal erhalten, wenn eine Aufgabe fehlschlägt. Dazu können Sie eine JSON-Konfigurationsdatei verwenden, die in etwa so aussieht:

{
  "taskGroups": [
    {
      "taskSpec": {
        "runnables": [
          {
            "script": {
              "text": "echo Hello World! This is task $BATCH_TASK_INDEX."
            }
          }
        ]
      },
      "taskCount": 3,
    }
  ],
  "logsPolicy": {
      "destination": "CLOUD_LOGGING"
  },
  "notifications": [
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        "type": "JOB_STATE_CHANGED"
      }
    },
    {
      "pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
      "message": {
        "type": "TASK_STATE_CHANGED",
        "newTaskState": "FAILED"
      }
    }
  ]
}

Go

import (
	"context"
	"fmt"
	"io"

	batch "cloud.google.com/go/batch/apiv1"
	"cloud.google.com/go/batch/apiv1/batchpb"
	durationpb "google.golang.org/protobuf/types/known/durationpb"
)

// Creates and runs a job with configured notifications
func createJobWithNotifications(w io.Writer, projectID, region, jobName, topicName string) (*batchpb.Job, error) {

	ctx := context.Background()
	batchClient, err := batch.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("batchClient error: %w", err)
	}
	defer batchClient.Close()

	script := &batchpb.Runnable_Script_{
		Script: &batchpb.Runnable_Script{
			Command: &batchpb.Runnable_Script_Text{
				Text: "echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks.",
			},
		},
	}

	taskSpec := &batchpb.TaskSpec{
		ComputeResource: &batchpb.ComputeResource{
			// CpuMilli is milliseconds per cpu-second. This means the task requires 2 whole CPUs.
			CpuMilli:  2000,
			MemoryMib: 16,
		},
		MaxRunDuration: &durationpb.Duration{
			Seconds: 3600,
		},
		MaxRetryCount: 2,
		Runnables: []*batchpb.Runnable{{
			Executable: script,
		}},
	}

	taskGroups := []*batchpb.TaskGroup{
		{
			TaskCount: 4,
			TaskSpec:  taskSpec,
		},
	}

	labels := map[string]string{"env": "testing", "type": "container"}

	// Policies are used to define on what kind of virtual machines the tasks will run on.
	// In this case, we tell the system to use "e2-standard-4" machine type.
	// Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
	allocationPolicy := &batchpb.AllocationPolicy{
		Instances: []*batchpb.AllocationPolicy_InstancePolicyOrTemplate{{
			PolicyTemplate: &batchpb.AllocationPolicy_InstancePolicyOrTemplate_Policy{
				Policy: &batchpb.AllocationPolicy_InstancePolicy{
					MachineType: "e2-standard-4",
				},
			},
		}},
	}

	// We use Cloud Logging as it's an out of the box available option
	logsPolicy := &batchpb.LogsPolicy{
		Destination: batchpb.LogsPolicy_CLOUD_LOGGING,
	}

	notifications := []*batchpb.JobNotification{
		{
			PubsubTopic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicName),
			Message: &batchpb.JobNotification_Message{
				Type: batchpb.JobNotification_JOB_STATE_CHANGED,
			},
		},
		{
			PubsubTopic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicName),
			Message: &batchpb.JobNotification_Message{
				Type:         batchpb.JobNotification_TASK_STATE_CHANGED,
				NewTaskState: batchpb.TaskStatus_FAILED,
			},
		},
	}

	job := &batchpb.Job{
		Name:             jobName,
		TaskGroups:       taskGroups,
		AllocationPolicy: allocationPolicy,
		Labels:           labels,
		Notifications:    notifications,
		LogsPolicy:       logsPolicy,
	}

	request := &batchpb.CreateJobRequest{
		Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, region),
		JobId:  jobName,
		Job:    job,
	}

	created_job, err := batchClient.CreateJob(ctx, request)
	if err != nil {
		return nil, fmt.Errorf("unable to create job: %w", err)
	}

	fmt.Fprintf(w, "Job created: %v\n", created_job)
	return created_job, nil
}

Java


import com.google.cloud.batch.v1.BatchServiceClient;
import com.google.cloud.batch.v1.CreateJobRequest;
import com.google.cloud.batch.v1.Job;
import com.google.cloud.batch.v1.JobNotification;
import com.google.cloud.batch.v1.JobNotification.Message;
import com.google.cloud.batch.v1.JobNotification.Type;
import com.google.cloud.batch.v1.LogsPolicy;
import com.google.cloud.batch.v1.LogsPolicy.Destination;
import com.google.cloud.batch.v1.Runnable;
import com.google.cloud.batch.v1.Runnable.Script;
import com.google.cloud.batch.v1.TaskGroup;
import com.google.cloud.batch.v1.TaskSpec;
import com.google.cloud.batch.v1.TaskStatus.State;
import com.google.common.collect.Lists;
import com.google.protobuf.Duration;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CreateBatchNotification {

  public static void main(String[] args)
      throws IOException, ExecutionException, InterruptedException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    // Project ID or project number of the Google Cloud project you want to use.
    String projectId = "YOUR_PROJECT_ID";
    // Name of the region you want to use to run the job. Regions that are
    // available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
    String region = "europe-central2";
    // The name of the job that will be created.
    // It needs to be unique for each project and region pair.
    String jobName = "JOB_NAME";
    // The Pub/Sub topic ID to send the notifications to.
    String topicId = "TOPIC_ID";

    createBatchNotification(projectId, region, jobName, topicId);
  }

  // Create a Batch job that sends notifications to Pub/Sub
  public static Job createBatchNotification(String projectId, String region, String jobName,
                                            String topicId)
      throws IOException, ExecutionException, InterruptedException, TimeoutException {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests.
    try (BatchServiceClient batchServiceClient = BatchServiceClient.create()) {
      // Define what will be done as part of the job.
      Runnable runnable =
          Runnable.newBuilder()
              .setScript(
                  Script.newBuilder()
                      .setText(
                          "echo Hello world! This is task ${BATCH_TASK_INDEX}. "
                                  + "This job has a total of ${BATCH_TASK_COUNT} tasks.")
                      // You can also run a script from a file. Just remember, that needs to be a
                      // script that's already on the VM that will be running the job.
                      // Using setText() and setPath() is mutually exclusive.
                      // .setPath("/tmp/test.sh")
                      .build())
              .build();

      TaskSpec task = TaskSpec.newBuilder()
              // Jobs can be divided into tasks. In this case, we have only one task.
              .addRunnables(runnable)
              .setMaxRetryCount(2)
              .setMaxRunDuration(Duration.newBuilder().setSeconds(3600).build())
              .build();

      // Tasks are grouped inside a job using TaskGroups.
      // Currently, it's possible to have only one task group.
      TaskGroup taskGroup = TaskGroup.newBuilder()
          .setTaskCount(3)
          .setParallelism(1)
          .setTaskSpec(task)
          .build();

      Job job =
          Job.newBuilder()
              .addTaskGroups(taskGroup)
              .addAllNotifications(buildNotifications(projectId, topicId))
              .putLabels("env", "testing")
              .putLabels("type", "script")
              // We use Cloud Logging as it's an out of the box available option.
              .setLogsPolicy(
                  LogsPolicy.newBuilder().setDestination(Destination.CLOUD_LOGGING))
              .build();

      CreateJobRequest createJobRequest =
          CreateJobRequest.newBuilder()
              // The job's parent is the region in which the job will run.
              .setParent(String.format("projects/%s/locations/%s", projectId, region))
              .setJob(job)
              .setJobId(jobName)
              .build();

      Job result =
          batchServiceClient
              .createJobCallable()
              .futureCall(createJobRequest)
              .get(5, TimeUnit.MINUTES);

      System.out.printf("Successfully created the job: %s", result.getName());

      return result;
    }
  }

  // Creates notification configurations to send messages to Pub/Sub when the state is changed
  private static Iterable<JobNotification> buildNotifications(String projectId, String topicId) {
    String pubsubTopic = String.format("projects/%s/topics/%s", projectId, topicId);

    JobNotification jobStateChanged = JobNotification.newBuilder()
            .setPubsubTopic(pubsubTopic)
            .setMessage(Message.newBuilder().setType(Type.JOB_STATE_CHANGED))
            .build();

    JobNotification taskStateChanged = JobNotification.newBuilder()
            .setPubsubTopic(pubsubTopic)
            .setMessage(Message.newBuilder()
                    .setType(Type.TASK_STATE_CHANGED)
                    .setNewTaskState(State.FAILED))
            .build();

    return Lists.newArrayList(jobStateChanged, taskStateChanged);
  }
}

Node.js

// Imports the Batch library
const batchLib = require('@google-cloud/batch');
const batch = batchLib.protos.google.cloud.batch.v1;

// Instantiates a client
const batchClient = new batchLib.v1.BatchServiceClient();

/**
 * TODO(developer): Update these variables before running the sample.
 */
// Project ID or project number of the Google Cloud project you want to use.
const PROJECT_ID = await batchClient.getProjectId();
// Name of the region you want to use to run the job. Regions that are
// available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
const REGION = 'europe-central2';
// The name of the job that will be created.
// It needs to be unique for each project and region pair.
const JOB_NAME = 'job-name-batch-notifications';
// The Pub/Sub topic ID to send the notifications to.
const TOPIC_ID = 'topic-id';

// Define what will be done as part of the job.
const task = new batch.TaskSpec();
const runnable = new batch.Runnable();
runnable.script = new batch.Runnable.Script();
runnable.script.commands = [
  '-c',
  'echo Hello world! This is task ${BATCH_TASK_INDEX}.',
];
task.runnables = [runnable];
task.maxRetryCount = 2;
task.maxRunDuration = {seconds: 3600};

// Tasks are grouped inside a job using TaskGroups.
const group = new batch.TaskGroup();
group.taskCount = 3;
group.taskSpec = task;

// Create batch notification when job state changed
const notification1 = new batch.JobNotification();
notification1.pubsubTopic = `projects/${PROJECT_ID}/topics/${TOPIC_ID}`;
notification1.message = {
  type: 'JOB_STATE_CHANGED',
};

// Create batch notification when task state changed
const notification2 = new batch.JobNotification();
notification2.pubsubTopic = `projects/${PROJECT_ID}/topics/${TOPIC_ID}`;
notification2.message = {
  type: 'TASK_STATE_CHANGED',
  newTaskState: 'FAILED',
};

const job = new batch.Job();
job.name = JOB_NAME;
job.taskGroups = [group];
job.notifications = [notification1, notification2];
job.labels = {env: 'testing', type: 'script'};
// We use Cloud Logging as it's an option available out of the box
job.logsPolicy = new batch.LogsPolicy();
job.logsPolicy.destination = batch.LogsPolicy.Destination.CLOUD_LOGGING;
// The job's parent is the project and region in which the job will run
const parent = `projects/${PROJECT_ID}/locations/${REGION}`;

async function callCreateBatchNotifications() {
  // Construct request
  const request = {
    parent,
    jobId: JOB_NAME,
    job,
  };

  // Run request
  const [response] = await batchClient.createJob(request);
  console.log(JSON.stringify(response));
}

await callCreateBatchNotifications();

Python

from google.cloud import batch_v1


def create_with_pubsub_notification_job(
    project_id: str, region: str, job_name: str, topic_name: str
) -> batch_v1.Job:
    """
    This method shows how to create a sample Batch Job that will run
    a simple command inside a container on Cloud Compute instances.

    Args:
        project_id: project ID or project number of the Cloud project you want to use.
        region: name of the region you want to use to run the job. Regions that are
            available for Batch are listed on: https://cloud.google.com/batch/docs/locations
        job_name: the name of the job that will be created.
            It needs to be unique for each project and region pair.
        topic_name: the name of the Pub/Sub topic to which the notification will be sent.
            The topic should be created in GCP Pub/Sub before running this method.
            The procedure for creating a topic is listed here: https://cloud.google.com/pubsub/docs/create-topic

    Returns:
        A job object representing the job created.
    """

    client = batch_v1.BatchServiceClient()

    # Define what will be done as part of the job.
    runnable = batch_v1.Runnable()
    runnable.container = batch_v1.Runnable.Container()
    runnable.container.image_uri = "gcr.io/google-containers/busybox"
    runnable.container.entrypoint = "/bin/sh"
    runnable.container.commands = [
        "-c",
        "echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks.",
    ]

    # Jobs can be divided into tasks. In this case, we have only one task.
    task = batch_v1.TaskSpec()
    task.runnables = [runnable]

    # We can specify what resources are requested by each task.
    resources = batch_v1.ComputeResource()
    resources.cpu_milli = 2000  # in milliseconds per cpu-second. This means the task requires 2 whole CPUs.
    resources.memory_mib = 16  # in MiB
    task.compute_resource = resources

    task.max_retry_count = 2
    task.max_run_duration = "3600s"

    # Tasks are grouped inside a job using TaskGroups.
    # Currently, it's possible to have only one task group.
    group = batch_v1.TaskGroup()
    group.task_count = 4
    group.task_spec = task

    # Policies are used to define on what kind of virtual machines the tasks will run on.
    # In this case, we tell the system to use "e2-standard-4" machine type.
    # Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
    policy = batch_v1.AllocationPolicy.InstancePolicy()
    policy.machine_type = "e2-standard-4"
    instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
    instances.policy = policy
    allocation_policy = batch_v1.AllocationPolicy()
    allocation_policy.instances = [instances]

    job = batch_v1.Job()
    job.task_groups = [group]
    job.allocation_policy = allocation_policy
    job.labels = {"env": "testing", "type": "container"}
    # We use Cloud Logging as it's an out of the box available option
    job.logs_policy = batch_v1.LogsPolicy()
    job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

    # Configuring the first notification
    notification1 = batch_v1.JobNotification()
    notification1.pubsub_topic = f"projects/{project_id}/topics/{topic_name}"
    # Define the message that will be sent to the topic
    first_massage = batch_v1.JobNotification.Message()
    # Specify the new job state that will trigger the notification
    # In this case, the notification is triggered when the job state changes to SUCCEEDED
    first_massage.type_ = batch_v1.JobNotification.Type.JOB_STATE_CHANGED
    first_massage.new_job_state = batch_v1.JobStatus.State.SUCCEEDED
    # Assign the message to the notification
    notification1.message = first_massage

    # Configuring the second notification
    notification2 = batch_v1.JobNotification()
    notification2.pubsub_topic = f"projects/{project_id}/topics/{topic_name}"
    second_message = batch_v1.JobNotification.Message()
    second_message.type_ = batch_v1.JobNotification.Type.TASK_STATE_CHANGED
    second_message.new_task_state = batch_v1.TaskStatus.State.FAILED
    notification2.message = second_message

    # Assign a list of notifications to the job.
    job.notifications = [notification1, notification2]

    create_request = batch_v1.CreateJobRequest()
    create_request.job = job
    create_request.job_id = job_name
    # The job's parent is the region in which the job will run
    create_request.parent = f"projects/{project_id}/locations/{region}"
    return client.create_job(create_request)

Nachdem der Job gestartet wurde, können Sie die zugehörigen Benachrichtigungen verwenden. Wenn das Pub/Sub-Thema für Ihren Job beispielsweise ein Abo hat, das Benachrichtigungen an BigQuery streamt, können Sie die Pub/Sub-Benachrichtigungen in BigQuery analysieren.

Nächste Schritte