Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页介绍了如何使用 Cloud Composer 2 在Google Cloud上运行 Dataproc Serverless 工作负载。
以下部分中的示例介绍了如何使用运算符管理 Dataproc Serverless 批处理工作负载。您可以在用于创建、删除、列出和获取 Dataproc Serverless Spark 批处理工作负载的 DAG 中使用以下运算符:
为与 Dataproc Serverless 批处理工作负载搭配使用的运算符创建 DAG:
创建使用自定义容器和 Dataproc Metastore 的 DAG。
为这些 DAG 配置永久性历史记录服务器。
准备工作
启用 Dataproc API:
控制台
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
选择批处理工作负载文件的位置。您可以使用以下任一选项:
- 创建用于存储此文件的 Cloud Storage 存储分区。
- 使用环境的存储分区。由于您无需将此文件与 Airflow 同步,因此可以在
/dags
或/data
文件夹之外创建单独的子文件夹。例如/batches
。 - 使用现有存储分区。
设置文件和 Airflow 变量
本部分将演示如何为本教程设置文件并配置 Airflow 变量。
将 Dataproc Serverless Spark 机器学习工作负载文件上传到存储分区
本教程中的工作负载会运行一个 pyspark 脚本:
将任何 pyspark 脚本保存到名为
spark-job.py
的本地文件中。 例如,您可以使用 PySpark 示例脚本。
设置 Airflow 变量
以下部分中的示例使用 Airflow 变量。您可以在 Airflow 中为这些变量设置值,然后 DAG 代码就可以访问这些值。
本教程中的示例使用以下 Airflow 变量。您可以根据需要进行设置,具体取决于您使用的示例。
设置以下 Airflow 变量,以便在 DAG 代码中使用:
project_id
:项目 ID。bucket_name
:工作负载的主要 Python 文件 (spark-job.py
) 所在的存储分区的 URI。您已在准备工作中选择了此位置。phs_cluster
:永久性历史记录服务器集群名称。您可以在创建永久性历史记录服务器时设置此变量。image_name
:自定义容器映像的名称和标记 (image:tag
)。您在将 DataprocCreateBatchOperator 与自定义容器映像搭配使用时设置此变量。metastore_cluster
:Dataproc Metastore 服务名称。 在将 DataprocCreateBatchOperator 与 Dataproc Metastore 服务搭配使用时,您需要设置此变量。region_name
:Dataproc Metastore 服务所在的区域。在将 DataprocCreateBatchOperator 与 Dataproc Metastore 服务搭配使用时,您需要设置此变量。
使用 Google Cloud 控制台和 Airflow 界面设置每个 Airflow 变量
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击与您的环境对应的 Airflow 链接。Airflow 界面随即打开。
在 Airflow 界面中,依次选择 Admin > Variables。
点击添加新记录。
在键字段中指定变量的名称,并在 Val 字段中为其设置值。
点击保存。
创建永久性历史记录服务器
使用 Persistent History Server (PHS) 查看批处理工作负载的 Spark 历史记录文件:
- 创建永久性历史记录服务器。
- 确保您已在
phs_cluster
Airflow 变量中指定 PHS 集群的名称。
DataprocCreateBatchOperator
以下 DAG 会启动 Dataproc Serverless 批处理工作负载。
如需详细了解 DataprocCreateBatchOperator
参数,请参阅运算符的源代码。
如需详细了解您可以在 DataprocCreateBatchOperator
的 batch
参数中传入的属性,请参阅 Batch 类的说明。
将自定义容器映像与 DataprocCreateBatchOperator 搭配使用
以下示例展示了如何使用自定义容器映像运行工作负载。例如,您可以使用自定义容器添加默认容器映像未提供的 Python 依赖项。
如需使用自定义容器映像,请执行以下操作:
在
image_name
Airflow 变量中指定映像。将 DataprocCreateBatchOperator 与自定义映像搭配使用:
将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用
如需从 DAG 使用 Dataproc Metastore 服务,请执行以下操作:
检查元存储空间服务是否已启动。
如需了解如何启动 Metastore 服务,请参阅启用和停用 Dataproc Metastore。
如需详细了解用于创建配置的批处理运算符,请参阅 PeripheralsConfig。
Metastore 服务启动并运行后,在
metastore_cluster
变量中指定其名称,并在region_name
Airflow 变量中指定其区域。在 DataprocCreateBatchOperator 中使用 Metastore 服务:
DataprocDeleteBatchOperator
您可以使用 DataprocDeleteBatchOperator 基于工作负载的批处理 ID 删除批处理。
DataprocListBatchesOperator
DataprocDeleteBatchOperator 会列出给定 project_id 和区域中存在的批处理。
DataprocGetBatchOperator
DataprocGetBatchOperator 用于提取一个特定的批处理工作负载。