Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本教程介绍如何使用 Cloud Composer 创建在 Dataproc 集群上运行 Apache Hadoop WordCount 作业的 Apache Airflow DAG(有向无环图)。
目标
- 访问您的 Cloud Composer 环境并使用 Airflow 界面。
- 创建和查看 Airflow 环境变量。
- 创建和运行包含以下任务的 DAG:
- 创建 Dataproc 集群。
- 在该集群上运行 Apache Hadoop WordCount 作业。
- 将单词数结果输出到 Cloud Storage 存储分区。
- 删除集群。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- Cloud Composer
- Dataproc
- Cloud Storage
您可使用价格计算器根据您的预计使用情况来估算费用。
准备工作
确保已在项目中启用以下 API:
控制台
Enable the Dataproc, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.com
storage-component.googleapis.com 在您的项目中,创建任意存储类别的 Cloud Storage 存储桶和区域,以用于存储 Hadoop WordCount 作业的结果。
记下您创建的存储桶的路径,例如
gs://example-bucket
。您将为此路径定义一个 Airflow 变量, 使用本教程后面部分的示例 DAG 中的变量。创建一个 Cloud Composer 环境,默认环境 参数。等待环境创建操作完成。完成后, 相应环境名称左侧会显示绿色对勾标记。
请记住您在其中创建了环境的区域,例如
us-central
。您将为此区域定义一个 Airflow 变量,并在示例 DAG 中使用该变量在同一区域运行 Dataproc 集群。
设置 Airflow 变量
设置 Airflow 变量 以便稍后在示例 DAG 中使用。例如,您 可以在 Airflow 界面中设置 Airflow 变量。
Airflow 变量 | 值 |
---|---|
gcp_project
|
项目的项目 ID
例如 example-project 。 |
gcs_bucket
|
您为本教程创建的 URI 中的 Cloud Storage 存储分区。
例如 gs://example-bucket |
gce_region
|
您创建环境的区域,例如 us-central1 。
这是将在其中创建 Dataproc 集群的区域。 |
查看示例工作流
Airflow DAG 是您要调度的一组有序任务
并运行DAG 在标准 Python 文件中定义。代码如下所示:
hadoop_tutorial.py
是工作流代码。
运算符
为了编排示例工作流程中的三个任务,DAG 导入以下三个 Airflow 运算符:
DataprocClusterCreateOperator
:用于创建 Dataproc 集群。DataProcHadoopOperator
:提交 Hadoop Wordcount 作业并写入结果 Cloud Storage 存储桶DataprocClusterDeleteOperator
:用于删除集群,以避免产生持续的 Compute Engine 费用。
依赖项
您可以采用一种能够反映任务关系和依赖项的方式来组织要运行的任务。此 DAG 中的任务将按顺序运行。
时间安排
此 DAG 的名称为 composer_hadoop_tutorial
,此 DAG 各运行一次
。由于传递到 default_dag_args
的 start_date
设置为 yesterday
,因此 Cloud Composer 会安排工作流在 DAG 上传到环境的存储桶后立即开始运行。
将该 DAG 上传到环境的存储桶
Cloud Composer 将 DAG 存储在您的/dags
环境的存储桶中。
如需上传 DAG,请执行以下操作:
在本地机器上,保存
hadoop_tutorial.py
。在 Google Cloud 控制台中,前往环境页面。
在环境列表的 DAGs 文件夹列中, 环境中,点击 DAG 链接。
点击上传文件。
在本地机器上选择
hadoop_tutorial.py
,然后点击打开。
Cloud Composer 会将此 DAG 添加到 Airflow,并自动安排此 DAG。DAG 会在 3 到 5 分钟内发生更改。
探索 DAG 运行
查看任务状态
当您将 DAG 文件上传到 Cloud Storage 中的 dags/
文件夹时,Cloud Composer 会解析该文件。成功完成后,工作流的名称会显示在 DAG 列表中,并且工作流会排入队列以立即运行。
如需查看任务状态,请转到 Airflow 网页界面并点击工具栏中的 DAGs。
如需打开 DAG 详情页面,请点击
composer_hadoop_tutorial
。这个 页面包含工作流任务和 依赖项如需查看每个任务的状态,请点击 Graph View,然后将鼠标悬停在每个任务对应的图形上。
将工作流重新加入队列
如需从 Graph View 重新运行工作流,请执行以下操作:
- 在 Airflow 界面的“Graph View”中,点击
create_dataproc_cluster
图形。 - 如需重置三个任务,请点击 Clear,然后点击 OK 进行确认。
- 在“Graph View”中,再次点击
create_dataproc_cluster
。 - 如需将工作流重新加入队列,请点击 Run。
查看任务结果
您还可以转到以下 Google Cloud 控制台页面来检查 composer_hadoop_tutorial
工作流的状态和结果:
Dataproc 集群:可以监控集群的创建和删除。请注意,工作流创建的集群是临时的: 仅在工作流期间存在,并且会在 最后一个工作流任务。
Dataproc 作业:可以查看或监控 Apache Hadoop Wordcount 作业。点击“作业 ID”即可查看作业日志输出。
Cloud Storage 浏览器:在 您创建的 Cloud Storage 存储桶中的
wordcount
文件夹
清理
删除本教程中使用的资源:
删除 Cloud Composer 环境,包括 手动删除环境的存储桶
删除您要在其中 存储 Hadoop 字数统计作业的结果。