Schedule custom Spark and SparkSQL tasks

Stay organized with collections Save and categorize content based on your preferences.

Dataplex supports scheduling custom code execution, either as a one-time run or on a regular schedule. You can schedule customer data transformations using Spark (Java), PySpark (limited to Spark version 3.2), or SparkSQL. Dataplex executes the code using serverless Spark processing and a built-in serverless scheduler.

Terminology

  • Task: A Dataplex task represents the work that you want Dataplex to do on a schedule. It encapsulates your code, your parameters, and the schedule.

  • Job: A job represents a single run of a Dataplex task. For example, if a task is scheduled to run daily, Dataplex will create a job every day.

Scheduling modes

Dataplex supports two modes of scheduling:

  • Run once: Use this mode to run your task just once. You can choose to run it immediately or at a set time in the future. If you run the task immediately, it may still take up to two minutes for the execution to begin.

  • Run on a schedule: Use this mode to run the task on a repeated frequency. Supported repetitions are daily, weekly, monthly, or custom.

Before you begin

  1. Enable the Dataproc API.

    Enable the Dataproc API

  2. Enable Private Google Access for your network and/or sub-network. Enable Private Google Access on the network you use with Dataplex tasks. If you don't specify a network or sub-network when creating the Dataplex task, Dataplex will use the default subnet, and you will need to enable Private Google Access for the default subnet.

  3. Create a service account. A service account is required to schedule any Dataplex tasks. The service account must belong to the project that you will execute the tasks in. The service account must have the following permissions:

    • Access to the BigQuery and/or Cloud Storage data that is being processed.

    • Dataproc Worker Role permission in the project where the task will be executed.

    • If the task needs to read or update the Dataproc Metastore instance attached to the lake, the service account needs the Dataproc Metastore Viewer or Editor role. This role must be granted in the project where the Dataplex lake is setup.

    • If the task is a Spark SQL job, you need to grant the service account the Dataplex Developer role. This role must be granted in the project where the Dataplex lake is setup.

    • If the task is a Spark SQL job, you need Cloud Storage admin permissions on the bucket where results are written.

  4. Grant the user submitting the job the iam.serviceAccounts.actAs permission on the service account.

  5. Grant the Dataplex lake service account permissions to use the service account. You can find the Dataplex lake service account in the Lake Details page of the Google Cloud console.

  6. If the project in which your Dataplex lake resides is different from the project in which the task is to be executed, grant the Dataplex lake service account the Dataproc Editor Role permission in the project where the task will be executed.

  7. Place the required code artifacts (JARs, Python, or SQL script files) or archived files ( .jar, .tar, .tar.gz, .tgz, .zip) in a Cloud Storage path.

  8. Make sure the service account has the required storage.objects.get permission to the Cloud Storage bucket that's storing these code artifacts.

Schedule a Spark (Java or Python) task

Console

  1. In the Google Cloud console, go to the Dataplex page:

    Go to Dataplex

  2. Navigate to the Process view.

  3. Click Create Task.

  4. Under Create Custom Spark Task, click Create task.

  5. Choose a Dataplex lake.

  6. Provide a task name.

  7. Create an ID for your task.

  8. Under Task configuration > Type, choose Spark or PySpark.

  9. Enter the relevant arguments.

  10. Under Service account, enter a user service account that your custom Spark task can execute with.

  11. Click Continue.

  12. (Optional) Set schedule: select Run once or Repeat. Fill in the needed fields.

  13. Click Continue.

  14. (Optional) Customize resources and Add additional settings.

  15. Click Create.

gcloud

You can schedule a Spark (Java / Python) task using the gcloud CLI command. The following table lists the required and optional parameters to use:

Parameter Description
--lake The lake ID for the lake resource of the Dataplex service.
--location The location of the Dataplex service.
--spark-main-class The name of the driver's main class. The jar file that contains the class must be in the default CLASSPATH.
--spark-main-jar-file-uri The Cloud Storage URI of the jar file that contains the main class.
--spark-archive-uris (Optional) Cloud Storage URIs of archives to be extracted into the working directory of each executor. Supported file types: .jar, .tar, .tar.gz, .tgz, and .zip.
--spark-file-uris (Optional) Cloud Storage URIs of files to be placed into the working directory of each executor.
--batch-executors-count (Optional) The total number of job executors. The default value is 2.
--batch-max-executors-count (Optional) The maximum number of configurable executors. The default value is 1000. If batch-max-executors-count is greater than batch-executors-count, then auto-scaling is be enabled.
--container-image-java-jars (Optional) A list of Java JARS to add to the classpath. Valid input includes Cloud Storage URIs to Jar binaries.
For example, gs://bucket-name/my/path/to/file.jar.
--container-image-properties (Optional) Property keys, specified in a prefix:property format.
For example, core:hadoop.tmp.dir.
For more information, see Cluster properties .
--vpc-network-tags (Optional) A list of network tags to apply to the job.
--vpc-network-name (Optional) The Virtual Private Cloud network in which the job is run. By default, the VPC network named Default within the project is used.
Note that you should use only one of --vpc-network-name or --vpc-sub-network-name.
--vpc-sub-network-name (Optional) The VPC sub-network in which the job runs.
Note that you should use only one of --vpc-sub-network-name or --vpc-network-name.
--trigger-type Trigger type of the user-specified task. Values must be one of:
ON_DEMAND - The task runs one time shortly after task creation.
RECURRING - The task runs periodically on a schedule.
--trigger-start-time (Optional) The time of the first run of the task. The format for this is `{year}-{month}-{day}T{hour}:{min}:{sec}Z`, where the timezone is UTC. For example, "2017-01-15T01:30:00Z" encodes 01:30 UTC on January 15, 2017. If this value is not specified, the task will run after being submitted if the trigger type is ON_DEMAND, or on the specified schedule if the trigger type is RECURRING.
--trigger-disabled (Optional) Prevents the task from executing. This does not cancel already running tasks. It is intended to temporarily disable RECURRING tasks.
--trigger-max-retires (Optional) The number of retry attempts before aborting. Set the value to zero to never attempt to retry a failed task.
--trigger-schedule Cron schedule for running tasks periodically.
--description (Optional) Description of the task.
--display-name (Optional) Display name of the task.
--labels (Optional) List of label KEY=VALUE pairs to add.
--execution-args (Optional) The arguments to pass to the task. Arguments can be a mix of key-value pairs. You can pass a comma-separated list of key-value pairs as execution arguments. To pass positional args, set the key to TASK_ARGS, and set the value to a comma-separated string of all the positional arguments. To use a delimiter other than a comma, refer to escaping.
In case key-value and positional arguments are passed together, then TASK_ARGS will be passed as the last argument.
--execution-service-account Service account to use to execute a task.
--max-job-execution-lifetime (Optional) The maximum duration before the job execution expires.
--container-image (Optional) Custom container image for the job runtime environment. If not specified, a default container image will be used.
--kms-key (Optional) The Cloud KMS key to use for encryption, in the format:
projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name}

Java example:

glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND –spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>

PySpark example:

gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>

REST

Use the API Explorer to create a task.

Schedule a SparkSQL task

gcloud

To schedule a SparkSQL task, run the same gcloud CLI command as in the section above, but with the following additional parameters:

Parameter Description
--spark-sql-script The SQL query text. Either spark-sql-script or spark-sql-script-file is required.
--spark-sql-script-file A reference to a query file. This can be the Cloud Storage URI of the query file. Either spark-sql-script or spark-sql-script-file is required.
--execution-args For SparkSQL tasks, the following arguments are mandatory and need to be passed as positional arguments:
--output_location, <GCS uri of the output directory>
--output_format, <output file format>.
Supported formats are csv,json, parquet and orc.
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>

REST

Use the API Explorer to create a task.

Monitor your task

Console

  1. In the Google Cloud console, go to the Dataplex page:

    Go to Dataplex

  2. Navigate to the Process view.

  3. In the Tasks tab, there's a list of tasks, filtered by task template types.

  4. In the Name column, click any task you want to view.

  5. Under Jobs > Job ID, click the Job ID of the task you want to view.

  6. Click the job ID. This opens a Dataproc page in the Google Cloud console that allows you to see monitoring and output details.

gcloud

The following table lists the gcloud CLI commands for monitoring your tasks.

Action gcloud CLI command
Listing tasks gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id>
Viewing task details gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
Listing jobs of a task gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id>
Viewing job details gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

Dataplex executes jobs on Dataproc Serverless (Batches). To view the execution logs of a Dataplex job, follow these steps:

  1. Get the Dataproc Serverless (Batches) job ID. Run the following command:
gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
  1. View the logs. Run the following command, using the job ID you got from running the previous command:
gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>

REST

Use the API Explorer to get or list a task or job.

Manage the schedule

In the Google Cloud console, within Dataplex, you can edit the schedule of a task, delete a task, or cancel an ongoing job. The following table lists the gcloud CLI commands for these actions.

Action gcloud CLI command
Edit task schedule gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id>
Delete a task gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
Cancel a job gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

What's next