Dataplex supports scheduling custom code execution, either as a one-time run, on a regular schedule, or on-demand (on-demand is in public preview, available only via the API). You can schedule customer data transformations using Spark (Java), PySpark (limited to Spark version 3.2), or SparkSQL. Dataplex executes the code using serverless Spark processing and a built-in serverless scheduler.
Terminology
- Task
- A Dataplex task represents the work that you want Dataplex to do on a schedule. It encapsulates your code, your parameters, and the schedule.
- Job
- A job represents a single run of a Dataplex task. For example, if a task is scheduled to run daily, Dataplex will create a job every day.
Scheduling modes
Dataplex supports the following scheduling modes:
- Run once
- Use this mode to run your task just once. You can choose to run it immediately or at a set time in the future. If you run the task immediately, it may still take up to two minutes for the execution to begin.
- Run on a schedule
- Use this mode to run the task on a repeated frequency. Supported repetitions are daily, weekly, monthly, or custom.
- Run on-demand (public preview)
- Use this mode to run a previously created task on demand. This mode of
scheduling is currently in preview and is
not recommended for production workloads. It is only supported in the
RunTask
API. When your job runs on demand, Dataplex uses existing parameters to create a job. There is no support for parallel job executions.
Before you begin
Enable the Dataproc API.
Enable Private Google Access for your network and/or sub-network. Enable Private Google Access on the network you use with Dataplex tasks. If you don't specify a network or sub-network when creating the Dataplex task, Dataplex will use the default subnet, and you will need to enable Private Google Access for the default subnet.
Create a service account. A service account is required to schedule any Dataplex tasks. The service account must belong to the project that you will execute the tasks in. The service account must have the following permissions:
Access to the BigQuery and/or Cloud Storage data that is being processed.
Dataproc Worker Role permission in the project where the task will be executed.
If the task needs to read or update the Dataproc Metastore instance attached to the lake, the service account needs the Dataproc Metastore Viewer or Editor role. This role must be granted in the project where the Dataplex lake is setup.
If the task is a Spark SQL job, you need to grant the service account the Dataplex Developer role. This role must be granted in the project where the Dataplex lake is setup.
If the task is a Spark SQL job, you need Cloud Storage admin permissions on the bucket where results are written.
Grant the user submitting the job the Service Account User role (
roles/iam.serviceAccountUser
) on the service account. For instructions, see Manage access to service accounts.Grant the Dataplex lake service account permissions to use the service account. You can find the Dataplex lake service account in the Lake Details page of the Google Cloud console.
If the project in which your Dataplex lake resides is different from the project in which the task is to be executed, grant the Dataplex lake service account the Dataproc Editor Role permission in the project where the task will be executed.
Place the required code artifacts (JARs, Python, or SQL script files) or archived files ( .jar, .tar, .tar.gz, .tgz, .zip) in a Cloud Storage path.
Make sure the service account has the required
storage.objects.get
permission to the Cloud Storage bucket that's storing these code artifacts.
Schedule a Spark (Java or Python) task
Console
In the Google Cloud console, go to the Dataplex page:
Navigate to the Process view.
Click Create Task.
Under Create Custom Spark Task, click Create task.
Choose a Dataplex lake.
Provide a task name.
Create an ID for your task.
Under Task configuration > Type, choose Spark or PySpark.
Enter the relevant arguments.
Under Service account, enter a user service account that your custom Spark task can execute with.
Click Continue.
(Optional) Set schedule: select Run once or Repeat. Fill in the needed fields.
Click Continue.
(Optional) Customize resources and Add additional settings.
Click Create.
gcloud
You can schedule a Spark (Java / Python) task using the gcloud CLI command. The following table lists the required and optional parameters to use:
Parameter | Description |
---|---|
--lake |
The lake ID for the lake resource of the Dataplex service. |
--location |
The location of the Dataplex service. |
--spark-main-class |
The name of the driver's main class. The jar file that
contains the class must be in the default CLASSPATH . |
--spark-main-jar-file-uri |
The Cloud Storage URI of the jar file that contains
the main class. |
--spark-archive-uris |
(Optional) Cloud Storage URIs of archives to be extracted into
the working directory of each executor. Supported file types:
.jar , .tar , .tar.gz ,
.tgz , and .zip . |
--spark-file-uris |
(Optional) Cloud Storage URIs of files to be placed into the working directory of each executor. |
--batch-executors-count |
(Optional) The total number of job executors. The default value is 2. |
--batch-max-executors-count |
(Optional) The maximum number of configurable executors. The default
value is 1000. If batch-max-executors-count is greater than
batch-executors-count , then auto-scaling is be enabled.
|
--container-image-java-jars |
(Optional) A list of Java JARS to add to the classpath. Valid input
includes Cloud Storage URIs to Jar binaries. For example, gs://bucket-name/my/path/to/file.jar . |
--container-image-properties |
(Optional) Property keys, specified in a prefix:property
format.For example, core:hadoop.tmp.dir .For more information, see Cluster properties . |
--vpc-network-tags |
(Optional) A list of network tags to apply to the job. |
--vpc-network-name |
(Optional) The Virtual Private Cloud network in which the job is run. By
default, the VPC network named Default
within the project is used. Note that you should use only one of --vpc-network-name or --vpc-sub-network-name .
|
--vpc-sub-network-name |
(Optional) The VPC sub-network in which the job runs.
Note that you should use only one of --vpc-sub-network-name
or --vpc-network-name . |
--trigger-type |
Trigger type of the user-specified task. Values must be one of:ON_DEMAND - The task runs one time shortly after task
creation.RECURRING - The task runs periodically on a schedule. |
--trigger-start-time |
(Optional) The time of the first run of the task. The format for this
is `{year}-{month}-{day}T{hour}:{min}:{sec}Z`, where the timezone is
UTC. For example, "2017-01-15T01:30:00Z" encodes 01:30 UTC on
January 15, 2017. If this value is not specified, the task will run
after being submitted if the trigger type is ON_DEMAND , or
on the specified schedule if the trigger type is
RECURRING . |
--trigger-disabled |
(Optional) Prevents the task from executing. This does not cancel
already running tasks. It is intended to temporarily disable
RECURRING tasks. |
--trigger-max-retires |
(Optional) The number of retry attempts before aborting. Set the value to zero to never attempt to retry a failed task. |
--trigger-schedule |
Cron schedule for running tasks periodically. |
--description |
(Optional) Description of the task. |
--display-name |
(Optional) Display name of the task. |
--labels |
(Optional) List of label KEY=VALUE pairs to add. |
--execution-args |
(Optional) The arguments to pass to the task. Arguments can be a mix of
key-value pairs. You can pass a comma-separated list of key-value pairs as
execution arguments. To pass positional args, set the key to
TASK_ARGS , and set the value to a comma-separated string of
all the positional arguments. To use a delimiter other than a comma, refer
to escaping.In case key-value and positional arguments are passed
together, then TASK_ARGS will be passed as the last argument. |
--execution-service-account |
Service account to use to execute a task. |
--max-job-execution-lifetime |
(Optional) The maximum duration before the job execution expires. |
--container-image |
(Optional) Custom container image for the job runtime environment. If not specified, a default container image will be used. |
--kms-key |
(Optional) The Cloud KMS key to use for encryption, in the format:projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name} |
Java example:
glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND –spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>
PySpark example:
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>
REST
Use the API Explorer to create a task.
Schedule a SparkSQL task
gcloud
To schedule a SparkSQL task, run the same gcloud CLI command as in the section above, but with the following additional parameters:
Parameter | Description |
---|---|
--spark-sql-script |
The SQL query text. Either spark-sql-script or
spark-sql-script-file is required. |
--spark-sql-script-file |
A reference to a query file. This can be the Cloud Storage URI of
the query file, or path to the SQL script content. Either spark-sql-script or
spark-sql-script-file is required. |
--execution-args |
For SparkSQL tasks, the following arguments are mandatory and need to be
passed as positional arguments:--output_location, <GCS uri of the output directory> --output_format, <output file format> .Supported formats are csv,json, parquet and orc. |
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>
REST
Use the API Explorer to create a task.
Monitor your task
Console
In the Google Cloud console, go to the Dataplex page:
Navigate to the Process view.
In the Tasks tab, there's a list of tasks, filtered by task template types.
In the Name column, click any task you want to view.
Under Jobs > Job ID, click the Job ID of the task you want to view.
Click the job ID. This opens a Dataproc page in the Google Cloud console that allows you to see monitoring and output details.
gcloud
The following table lists the gcloud CLI commands for monitoring your tasks.
Action | gcloud CLI command |
---|---|
Listing tasks | gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id> |
Viewing task details | gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id> |
Listing jobs of a task | gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> |
Viewing job details | gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id> |
Dataplex executes jobs on Dataproc Serverless (Batches). To view the execution logs of a Dataplex job, follow these steps:
Get the Dataproc Serverless (Batches) job ID. Run the following command:
gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
View the logs. Run the following command, using the job ID you got from running the previous command:
gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
REST
Manage the schedule
In the Google Cloud console, within Dataplex, you can edit the schedule of a task, delete a task, or cancel an ongoing job. The following table lists the gcloud CLI commands for these actions.
Action | gcloud CLI command |
---|---|
Edit task schedule | gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id> |
Delete a task | gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id> |
Cancel a job | gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id> |
What's next
- See Dataproc templates.
- Try a prebuilt template to move data incrementally from Dataplex Cloud Storage assets to BigQuery.
- See Set up alerts and notifications for Dataplex tasks.