安排自定义 Spark 和 Spark SQL 任务

Dataplex 支持将自定义代码安排为一次性、按计划运行或按需执行。点播处于预览版阶段,只能通过 API 使用。您可以使用 Spark (Java)、PySpark(仅限 Spark 3.2 版本)或 Spark SQL 安排客户数据转换。Dataplex 使用无服务器 Spark 处理和内置无服务器调度程序执行代码。

术语

任务
Dataplex 任务表示您希望 Dataplex 按计划执行的工作。它会封装您的代码、参数和时间表。
Job

作业表示 Dataplex 任务的单次运行。例如,如果计划将任务设置为每天运行,Dataplex 将每天创建一项作业。

对于 2023 年 5 月 10 日或之后创建的作业,触发器字段会显示作业的执行触发器类型。

以下是作业执行触发器类型:

  • RUN_REQUEST:表示作业因调用 RunTask API 而执行。

  • TASK_CONFIG:表示由于任务的 TriggerSpec 配置而执行作业。

调度模式

Dataplex 支持以下调度模式:

运行一次
使用此模式可仅运行一次任务。您可以选择立即运行,也可以安排在将来的某个时间运行。如果立即运行任务,执行最多可能需要两分钟才能开始。
按计划运行
使用此模式重复运行任务。 支持的重复次数为每天、每周、每月或自定义。
按需运行

使用此模式可按需运行之前创建的任务。只有 RunTask API 支持运行模式。当您的作业按需运行时,Dataplex 会使用现有参数来创建作业。您可以指定 ExecutionSpec 参数和标签来执行作业。

准备工作

  1. 启用 Dataproc API。

    启用 Dataproc API

  2. 为您的网络和子网启用专用 Google 访问通道。对用于 Dataplex 任务的网络启用专用 Google 访问通道。如果您在创建 Dataplex 任务时未指定网络或子网,则 Dataplex 会使用默认子网,并且您必须为默认子网启用专用 Google 访问通道。

  3. 创建服务帐号。您必须拥有服务帐号才能安排任何 Dataplex 任务。该服务帐号必须属于您执行任务的项目。 该服务帐号必须具有以下权限:

    • 拥有对正在处理的 BigQuery 和/或 Cloud Storage 数据的访问权限。

    • 您在其中执行任务的项目的 Dataproc Worker Role 权限。

    • 如果任务需要读取或更新附加到数据湖的 Dataproc Metastore 实例,该服务帐号需要具有 Dataproc Metastore Viewer 或 Editor 角色。必须在设置了 Dataplex 数据湖的项目中授予此角色。

    • 如果任务是 Spark SQL 作业,您需要向服务帐号授予 Dataplex Developer 角色。必须在设置了 Dataplex 数据湖的项目中授予此角色。

    • 如果任务是 Spark SQL 作业,您需要拥有写入结果的存储桶的 Cloud Storage 管理员权限。

    • 如需安排和运行 Spark SQL 和自定义 Spark 任务,您的服务帐号必须具有 Dataplex Metadata Reader (roles/dataplex.metadataReader)、Dataplex Viewer (roles/dataplex.viewer) 及 Dataproc Metastore Metadata User (roles/metastore.metadataUser) IAM 角色。

  4. 向用户授予提交服务帐号的 Service Account User 角色 (roles/iam.serviceAccountUser)。有关说明,请参阅管理对服务帐号的访问权限

  5. 向 Dataplex 数据湖服务帐号授予使用服务帐号的权限。您可以在 Google Cloud 控制台数据湖详情页面中找到 Dataplex 数据湖服务帐号。

  6. 如果包含您的 Dataplex 数据湖的项目与要执行任务的项目不同,请为 Dataplex 数据湖服务帐号授予对执行该任务的项目的 Dataproc Editor 角色

  7. 将必需的代码工件(JAR、Python 或 SQL 脚本文件)或归档文件(.jar.tar.tar.gz.tgz.zip)放置在 Cloud Storage 路径中。

  8. 确保服务帐号对存储这些代码工件的 Cloud Storage 存储桶具有所需的 storage.objects.get 权限。

安排 Spark(Java 或 Python)任务

控制台

  1. 在 Google Cloud 控制台中,转到 Dataplex 页面:

    转到 Dataplex

  2. 转到 Process(处理)视图。

  3. 点击创建任务

  4. 对于创建自定义 Spark 任务,点击创建任务

  5. 选择 Dataplex 数据湖。

  6. 提供任务名称。

  7. 为您的任务创建 ID

  8. 任务配置部分的类型中,选择 SparkPySpark

  9. 输入相关参数。

  10. 服务帐号字段中,输入自定义 Spark 任务可以使用的用户服务帐号。

  11. 点击继续

  12. 可选:设置时间表:选择运行一次重复。填写必填字段。

  13. 点击继续

  14. 可选:自定义资源添加其他设置

  15. 点击创建

gcloud

您可以使用 gcloud CLI 命令安排 Spark (Java / Python) 任务。下表列出了要使用的必需参数和可选参数:

参数 说明
--lake Dataplex 服务的数据湖资源的数据湖 ID。
--location Dataplex 服务的位置。
--spark-main-class 驱动程序的主类。包含相应类的 jar 文件必须位于默认 CLASSPATH 中。
--spark-main-jar-file-uri 包含主类的 jar 文件的 Cloud Storage URI。
--spark-archive-uris 可选:要提取到每个执行程序的工作目录中的归档的 Cloud Storage URI。支持的文件类型:.jar.tar.tar.gz.tgz.zip
--spark-file-uris 可选:要放入每个执行程序的工作目录中的文件的 Cloud Storage URI。
--batch-executors-count 可选:作业执行程序的总数。默认值为 2。
--batch-max-executors-count 可选:可配置执行程序的数量上限。默认值为 1000。如果 batch-max-executors-count 大于 batch-executors-count,则 Dataplex 会启用自动扩缩功能。
--container-image-java-jars 可选:要添加到类路径的 Java JARS 列表。有效输入包括 Jar 二进制文件的 Cloud Storage URI。
例如 gs://bucket-name/my/path/to/file.jar
--container-image-properties 可选:以 prefix:property 格式指定的属性键。
例如 core:hadoop.tmp.dir
如需了解详情,请参阅集群属性
--vpc-network-tags 可选:要应用于作业的网络标记列表。
--vpc-network-name 可选:运行作业的虚拟私有云网络。默认情况下,Dataplex 使用项目中名为 Default 的 VPC 网络。
您只能使用 --vpc-network-name--vpc-sub-network-name 中的一个。
--vpc-sub-network-name 可选:运行作业的 VPC 子网。
您只能使用 --vpc-sub-network-name--vpc-network-name 中的一个。
--trigger-type 用户指定的任务的触发器类型。值必须为以下项之一:
ON_DEMAND - 任务在创建任务后不久运行一次。
RECURRING - 任务按计划定期运行。
--trigger-start-time 可选:任务的首次运行的时间。格式为“{year}-{month}-{day}T{hour}:{min}:{sec}Z”,其中时区为 UTC。例如,“2017-01-15T01:30:00Z”将编码为 2017 年 1 月 15 日的 01:30 UTC。如果未指定此值,当触发器类型为 ON_DEMAND 时,任务将在提交后运行;如果触发器类型为 RECURRING,则按计划运行任务。
--trigger-disabled 可选:阻止任务执行。此参数不会取消已在运行的任务,但会暂时停用 RECURRING 任务。
--trigger-max-retires 可选:取消前重试次数。将值设为 0 绝不尝试重试失败的任务。
--trigger-schedule Cron 时间表,用于定期运行任务。
--description 可选:任务说明。
--display-name 可选:任务的显示名称。
--labels 可选:要添加的标签 KEY=VALUE 对的列表。
--execution-args 可选:要传递给任务的参数。参数可以组合使用键值对。您可以将以英文逗号分隔的键值对列表作为执行参数传递。如需传递位置参数,请将该键设置为 TASK_ARGS,然后将值设置为所有位置参数的逗号分隔字符串。如需使用英文逗号以外的分隔符,请参阅转义
如果 key-value 和位置参数一起传递,则 TASK_ARGS 将作为最后一个参数传递。
--execution-service-account 用于执行任务的服务帐号。
--max-job-execution-lifetime 可选:作业执行前的最长时长。
--container-image 可选:适用于作业运行时环境的自定义容器映像。如果未指定,系统将使用默认容器映像。
--kms-key 可选:用于加密的 Cloud KMS 密钥,格式如下:
projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name}

Java 示例:

glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND –spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>

PySpark 示例:

gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>

REST

如需创建任务,请使用 API Explorer。

安排 Spark SQL 任务

gcloud

如需安排 Spark SQL 任务,请使用与安排 Spark(Java 或 Python)任务相同的 gcloud CLI 命令,但使用以下额外参数:

参数 说明
--spark-sql-script SQL 查询文本。必须提供 spark-sql-scriptspark-sql-script-file
--spark-sql-script-file 对查询文件的引用。此值可以是查询文件的 Cloud Storage URI,也可以是 SQL 脚本内容的路径。必须提供 spark-sql-scriptspark-sql-script-file
--execution-args 对于 Spark SQL 任务,必须选择以下参数作为位置参数:
--output_location, <GCS uri of the output directory>
--output_format, <output file format>
支持的格式为 csv、json、parquet 和 orc。
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>

REST

如需创建任务,请使用 API Explorer。

监控任务

控制台

  1. 在 Google Cloud 控制台中,转到 Dataplex 页面:

    转到 Dataplex

  2. 转到 Process(处理)视图。

  3. 任务标签页中有一个任务列表,按任务模板类型过滤。

  4. 名称列中,点击要查看的任何任务。

  5. 点击要查看的任务的作业 ID

    Dataproc 页面会在 Google Cloud 控制台中打开,您可以在其中查看监控和输出详细信息。

gcloud

下表列出了用于监控任务的 gcloud CLI 命令。

操作 gcloud CLI 命令
列出任务 gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id>
查看任务详细信息 gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
列出任务 gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id>
查看作业详情 gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

Dataplex 执行 Dataproc 无服务器(批量)作业。如需查看 Dataplex 作业的执行日志,请按以下步骤操作:

  1. 获取 Dataproc 无服务器(批量)作业 ID。运行以下命令:

    gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
    
  2. 查看日志。运行以下命令,使用运行上一个命令获得的作业 ID:

    gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
    

REST

如需对任务或作业执行 getlist 操作,请使用 API Explorer。

管理时间表

在 Google Cloud 控制台中,您可以在 Dataplex 中修改任务时间表、删除任务或取消正在进行的作业。下表列出了这些操作的 gcloud CLI 命令。

操作 gcloud CLI 命令
修改任务计划 gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id>
删除任务 gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
取消作业 gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

后续步骤