Schedule custom Spark and Spark SQL tasks

Dataplex supports scheduling custom code execution, either as a one-time run, on a regular schedule, or on-demand. On-demand is in Preview and is available only through API. You can schedule customer data transformations using Spark (Java), PySpark (limited to Spark version 3.2), or Spark SQL. Dataplex executes the code using serverless Spark processing and a built-in serverless scheduler.

Terminology

Task
A Dataplex task represents the work that you want Dataplex to do on a schedule. It encapsulates your code, your parameters, and the schedule.
Job

A job represents a single run of a Dataplex task. For example, if a task is scheduled to run daily, Dataplex will create a job every day.

For jobs created on or after May 10, 2023, the Trigger field shows the job's execution trigger type.

The following are the job execution trigger types:

  • RUN_REQUEST: Indicates that the job was executed because of calling the RunTask API.

  • TASK_CONFIG: Indicates that the job was executed because of the TriggerSpec configuration of the task.

Scheduling modes

Dataplex supports the following scheduling modes:

Run once
Use this mode to run your task just once. You can choose to run it immediately or at a set time in the future. If you run the task immediately, it may still take up to two minutes for the execution to begin.
Run on a schedule
Use this mode to run the task on a repeated frequency. Supported repetitions are daily, weekly, monthly, or custom.
Run on-demand

Use this mode to run a previously created task on demand. The run on-demand mode is only supported by the RunTask API. When your job runs on demand, Dataplex uses existing parameters to create a job. You can specify the ExecutionSpec arguments and the labels to execute the job.

Before you begin

  1. Enable the Dataproc API.

    Enable the Dataproc API

  2. Enable Private Google Access for your network and subnetwork. Enable Private Google Access on the network that you use with Dataplex tasks. If you don't specify a network or a subnetwork when creating the Dataplex task, Dataplex uses the default subnetwork, and you must enable Private Google Access for the default subnetwork.

  3. Create a service account. A service account is required to schedule any Dataplex tasks. The service account must belong to the project in which you execute the tasks. The service account must have the following permissions:

    • Access to the BigQuery and Cloud Storage data that is being processed.

    • Dataproc Worker Role permission on the project in which you execute the task.

    • If the task needs to read or update the Dataproc Metastore instance attached to the lake, the service account needs the Dataproc Metastore Viewer or Editor role. This role must be granted in the project where the Dataplex lake is set up.

    • If the task is a Spark SQL job, you need to grant the service account the Dataplex Developer role. This role must be granted in the project where the Dataplex lake is set up.

    • If the task is a Spark SQL job, you need Cloud Storage administrator permissions on the bucket where results are written.

    • To schedule and run Spark SQL and custom Spark tasks, you must be granted the Dataplex Metadata Reader (roles/dataplex.metadataReader), Dataplex Viewer (roles/dataplex.viewer), and Dataproc Metastore Metadata User (roles/metastore.metadataUser) IAM roles on your service account.

  4. Grant the user submitting the job the Service Account User role (roles/iam.serviceAccountUser) on the service account. For instructions, see Manage access to service accounts.

  5. Grant the Dataplex lake service account permissions to use the service account. You can find the Dataplex lake service account in the Lake Details page of the Google Cloud console.

  6. If the project containing your Dataplex lake is different from the project in which the task is to be executed, grant the Dataplex lake service account the Dataproc Editor Role on the project in which you execute the task.

  7. Place the required code artifacts (JARs, Python, or SQL script files) or archived files (.jar, .tar, .tar.gz, .tgz, .zip) in a Cloud Storage path.

  8. Make sure the service account has the required storage.objects.get permission to the Cloud Storage bucket that's storing these code artifacts.

Schedule a Spark (Java or Python) task

Console

  1. In the Google Cloud console, go to the Dataplex page:

    Go to Dataplex

  2. Navigate to the Process view.

  3. Click Create Task.

  4. For Create Custom Spark Task, click Create task.

  5. Choose a Dataplex lake.

  6. Provide a task name.

  7. Create an ID for your task.

  8. In the Task configuration section, for Type, select Spark or PySpark.

  9. Enter the relevant arguments.

  10. In the Service account field, enter a user service account that your custom Spark task can execute with.

  11. Click Continue.

  12. Optional: Set schedule: select Run once or Repeat. Fill in the required fields.

  13. Click Continue.

  14. Optional: Customize resources and Add additional settings.

  15. Click Create.

gcloud

You can schedule a Spark (Java / Python) task using the gcloud CLI command. The following table lists the required and optional parameters to use:

Parameter Description
--lake The lake ID for the lake resource of the Dataplex service.
--location The location of the Dataplex service.
--spark-main-class The main class of driver. The jar file that contains the class must be in the default CLASSPATH.
--spark-main-jar-file-uri The Cloud Storage URI of the jar file that contains the main class.
--spark-archive-uris Optional: Cloud Storage URIs of archives to be extracted into the working directory of each executor. Supported file types: .jar, .tar, .tar.gz, .tgz, and .zip.
--spark-file-uris Optional: Cloud Storage URIs of files to be placed into the working directory of each executor.
--batch-executors-count Optional: The total number of job executors. The default value is 2.
--batch-max-executors-count Optional: The maximum number of configurable executors. The default value is 1000. If batch-max-executors-count is greater than batch-executors-count, then Dataplex enables autoscaling.
--container-image-java-jars Optional: A list of Java JARS to add to the classpath. Valid input includes Cloud Storage URIs to Jar binaries.
For example, gs://bucket-name/my/path/to/file.jar.
--container-image-properties Optional: Property keys, specified in a prefix:property format.
For example, core:hadoop.tmp.dir.
For more information, see Cluster properties.
--vpc-network-tags Optional: A list of network tags to apply to the job.
--vpc-network-name Optional: The Virtual Private Cloud network in which the job is run. By default, Dataplex uses the VPC network named Default within the project.
You must use only one of --vpc-network-name or --vpc-sub-network-name.
--vpc-sub-network-name Optional: The VPC subnetwork in which the job runs.
You must use only one of --vpc-sub-network-name or --vpc-network-name.
--trigger-type Trigger type of the user-specified task. Values must be one of:
ON_DEMAND - The task runs one time shortly after task creation.
RECURRING - The task runs periodically on a schedule.
--trigger-start-time Optional: The time of the first run of the task. The format is `{year}-{month}-{day}T{hour}:{min}:{sec}Z`, where the timezone is UTC. For example, "2017-01-15T01:30:00Z" encodes 01:30 UTC on January 15, 2017. If this value is not specified, the task will run after being submitted if the trigger type is ON_DEMAND, or on the specified schedule if the trigger type is RECURRING.
--trigger-disabled Optional: Prevents the task from executing. This parameter doesn't cancel the tasks already running, but rather temporarily disables the RECURRING tasks.
--trigger-max-retires Optional: The number of retry attempts before aborting. Set the value to zero to never attempt to retry a failed task.
--trigger-schedule Cron schedule for running tasks periodically.
--description Optional: Description of the task.
--display-name Optional: Display name of the task.
--labels Optional: List of label KEY=VALUE pairs to add.
--execution-args Optional: The arguments to pass to the task. Arguments can be a mix of key-value pairs. You can pass a comma-separated list of key-value pairs as execution arguments. To pass positional args, set the key to TASK_ARGS, and set the value to a comma-separated string of all the positional arguments. To use a delimiter other than a comma, refer to escaping.
In case key-value and positional arguments are passed together, then TASK_ARGS will be passed as the last argument.
--execution-service-account Service account to use to execute a task.
--max-job-execution-lifetime Optional: The maximum duration before the job execution expires.
--container-image Optional: Custom container image for the job runtime environment. If not specified, a default container image will be used.
--kms-key Optional: The Cloud KMS key to use for encryption, in the format:
projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name}

Java example:

glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND –spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>

PySpark example:

gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>

REST

To create a task, use the APIs Explorer.

Schedule a Spark SQL task

gcloud

To schedule a Spark SQL task, run the same gcloud CLI command as in the Schedule a Spark (Java or Python) task, with the following additional parameters:

Parameter Description
--spark-sql-script The SQL query text. Either spark-sql-script or spark-sql-script-file is required.
--spark-sql-script-file A reference to a query file. This value can be the Cloud Storage URI of the query file, or path to the SQL script content. Either spark-sql-script or spark-sql-script-file is required.
--execution-args For Spark SQL tasks, the following arguments are mandatory and need to be passed as positional arguments:
--output_location, <GCS uri of the output directory>
--output_format, <output file format>.
Supported formats are CSV file, JSON file, parquet, and orc.
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>

REST

To create a task, use the APIs Explorer.

Monitor your task

Console

  1. In the Google Cloud console, go to the Dataplex page:

    Go to Dataplex

  2. Navigate to the Process view.

  3. In the Tasks tab, there's a list of tasks, filtered by task template types.

  4. In the Name column, click any task you want to view.

  5. Click the Job ID of the task you want to view.

    The Dataproc page opens in the Google Cloud console that lets you view the monitoring and output details.

gcloud

The following table lists the gcloud CLI commands for monitoring your tasks.

Action gcloud CLI command
Listing tasks gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id>
Viewing task details gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
Listing jobs of a task gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id>
Viewing job details gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

Dataplex executes jobs on Dataproc Serverless (Batches). To view the execution logs of a Dataplex job, follow these steps:

  1. Get the Dataproc Serverless (Batches) job ID. Run the following command:

    gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
    
  2. View the logs. Run the following command, using the job ID you got from running the previous command:

    gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
    

REST

To get or list a task or job, use the APIs Explorer.

Manage the schedule

In the Google Cloud console, within Dataplex, you can edit the schedule of a task, delete a task, or cancel an ongoing job. The following table lists the gcloud CLI commands for these actions.

Action gcloud CLI command
Edit task schedule gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id>
Delete a task gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
Cancel a job gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

What's next