Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本教程介绍如何使用 Cloud Composer 创建 Apache Airflow DAG(有向无环图) 在 Dataproc 上运行 Apache Hadoop WordCount 作业, 集群。
目标
- 访问您的 Cloud Composer 环境并使用 Airflow 界面。
- 创建和查看 Airflow 环境变量。
- 创建和运行包含以下任务的 DAG:
- 创建 Dataproc 集群。
- 在该集群上运行 Apache Hadoop WordCount 作业。
- 将单词数结果输出到 Cloud Storage 存储分区。
- 删除集群。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- Cloud Composer
- Dataproc
- Cloud Storage
您可使用价格计算器根据您的预计使用情况来估算费用。
准备工作
确保已在项目中启用以下 API:
控制台
Enable the Dataproc, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.com
storage-component.googleapis.com 在项目中 创建 Cloud Storage 存储桶 用于存储 Hadoop 的 计算词汇数。
记下您创建的存储桶的路径,例如
gs://example-bucket
。您将为此路径定义一个 Airflow 变量, 使用本教程后面部分的示例 DAG 中的变量。创建一个 Cloud Composer 环境,默认环境 参数。等待环境创建操作完成。完成后, 相应环境名称左侧会显示绿色对勾标记。
记下您创建环境的区域,例如
us-central
。您将为此区域定义一个 Airflow 变量 并在示例 DAG 中使用它来运行 Dataproc 集群 同一区域内的
设置 Airflow 变量
设置 Airflow 变量,以便稍后在示例 DAG 中使用。例如,您可以在 Airflow 界面中设置 Airflow 变量。
Airflow 变量 | 值 |
---|---|
gcp_project
|
项目的项目 ID
例如 example-project 。 |
gcs_bucket
|
您为本教程创建的 Cloud Storage 存储桶的 URI,例如 gs://example-bucket |
gce_region
|
您创建环境的区域,例如 us-central1 。这是您的 Dataproc 集群所在的区域
创建项目。 |
查看示例工作流
Airflow DAG 是您要安排和运行的有序任务的集合。DAG 在标准 Python 文件中定义。hadoop_tutorial.py
中显示的代码就是工作流代码。
运算符
为了编排示例工作流程中的三个任务,DAG 导入以下三个 Airflow 运算符:
DataprocClusterCreateOperator
:用于创建 Dataproc 集群。DataProcHadoopOperator
:提交 Hadoop Wordcount 作业并写入结果 Cloud Storage 存储桶DataprocClusterDeleteOperator
:用于删除集群,以避免产生持续的 Compute Engine 费用。
依赖项
您可以采用一种能够反映任务关系和依赖项的方式来组织要运行的任务。此 DAG 中的任务将按顺序运行。
时间安排
此 DAG 的名称为 composer_hadoop_tutorial
,此 DAG 各运行一次
。由于传递到 default_dag_args
的 start_date
设置为 yesterday
,因此 Cloud Composer 会安排工作流在 DAG 上传到环境的存储桶后立即开始运行。
将该 DAG 上传到环境的存储桶
Cloud Composer 会将 DAG 存储在环境存储桶的 /dags
文件夹中。
如需上传 DAG,请执行以下操作:
在本地机器上,保存
hadoop_tutorial.py
。在 Google Cloud 控制台中,前往环境页面。
在环境列表的 DAGs 文件夹列中, 环境中,点击 DAG 链接。
点击上传文件。
在本地机器上选择
hadoop_tutorial.py
,然后点击打开。
Cloud Composer 会将此 DAG 添加到 Airflow,并自动安排此 DAG。DAG 会在 3 到 5 分钟内发生更改。
探索 DAG 运行
查看任务状态
当您将 DAG 文件上传到 Cloud Storage 中的 dags/
文件夹时,Cloud Composer 会解析该文件。成功完成后,工作流的名称会显示在 DAG 列表中,并且工作流会排入队列以立即运行。
如需查看任务状态,请转到 Airflow 网页界面并点击工具栏中的 DAGs。
如需打开 DAG 详情页面,请点击
composer_hadoop_tutorial
。这个 页面包含工作流任务和 依赖项如需查看每个任务的状态,请点击 Graph View,然后将鼠标悬停在每个任务对应的图形上。
将工作流重新加入队列
如需从 Graph View 重新运行工作流,请执行以下操作:
- 在 Airflow 界面的“Graph View”中,点击
create_dataproc_cluster
图形。 - 如需重置三个任务,请点击 Clear,然后点击 OK 进行确认。
- 在“Graph View”中,再次点击
create_dataproc_cluster
。 - 如需将工作流重新加入队列,请点击 Run。
查看任务结果
您还可以查看composer_hadoop_tutorial
的状态和结果
工作流:
Dataproc 集群:用于监控集群创建和 删除。请注意,由工作流创建的集群是临时性的,也就是说,此类集群仅在工作流的持续期间内存在,并且将在最后一个工作流任务的执行过程中删除。
Dataproc 作业:用于查看或监控 Apache Hadoop WordCount 作业。点击“作业 ID”即可查看作业日志输出。
Cloud Storage 浏览器:可以在您为本教程创建的 Cloud Storage 存储桶所含的
wordcount
文件夹中查看 WordCount 的结果。
清理
删除本教程中使用的资源: