Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本教程介绍如何使用 Cloud Composer 创建 Apache Airflow DAG。该 DAG 会联接来自 BigQuery 公共数据集和存储在 Cloud Storage 存储桶中的 CSV 文件的数据,然后运行 Dataproc Serverless 批量作业来处理联接的数据。
本教程中的 BigQuery 公共数据集是 ghcn_d,这是一个包含全球气候摘要的集成数据库。该 CSV 文件包含 1997 年至 2021 年美国节假日的日期和名称信息。
我们想使用 DAG 回答的问题是:“过去 25 年来,芝加哥的感恩节有多暖和?”
目标
- 以默认配置创建 Cloud Composer 环境
- 创建空的 BigQuery 数据集
- 创建新的 Cloud Storage 存储桶
- 创建并运行包含以下任务的 DAG:
- 将外部数据集从 Cloud Storage 加载到 BigQuery
- 在 BigQuery 中联接两个数据集
- 运行数据分析 PySpark 作业
准备工作
启用 API
启用以下 API:
控制台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予权限
向您的用户账号授予以下角色和权限:
授予 BigQuery Data Owner (
roles/bigquery.dataOwner
) 角色,以便创建 BigQuery 数据集。授予 Storage Admin (
roles/storage.admin
) 角色以创建 Cloud Storage 存储桶。
创建并准备 Cloud Composer 环境
使用默认参数创建 Cloud Composer 环境:
- 选择美国境内的区域。
- 选择最新的 Cloud Composer 版本。
请向 Cloud Composer 环境中使用的服务账号授予以下角色,以便 Airflow 工作器成功运行 DAG 任务:
- BigQuery User (
roles/bigquery.user
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - Service Account User (
roles/iam.serviceAccountUser
) - Dataproc Editor (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- BigQuery User (
创建相关资源
创建一个空的 BigQuery 数据集,并使用以下参数:
- 名称:
holiday_weather
- 区域:
US
- 名称:
在
US
多区域中创建新的 Cloud Storage 存储桶。运行以下命令,在您希望运行 Dataproc Serverless 的区域中的默认子网上启用专用 Google 访问通道,以满足网络要求。建议使用与 Cloud Composer 环境相同的区域。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
使用 Dataproc Serverless 进行数据处理
探索示例 PySpark 作业
以下代码是一个 PySpark 作业示例,用于将温度从十分之一摄氏度转换为摄氏度。此作业将数据集中的温度数据转换为其他格式。
将支持文件上传到 Cloud Storage
如需上传 PySpark 文件和存储在 holidays.csv
中的数据集,请执行以下操作:
将 data_analytics_process.py 保存到本地机器。
将 holidays.csv 保存到本地机器。
在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:
点击您之前创建的存储桶的名称。
在相应存储桶的对象标签页中,点击上传文件按钮,在随即显示的对话框中选择
data_analytics_process.py
和holidays.csv
,然后点击打开。
数据分析 DAG
探索示例 DAG
该 DAG 使用多个运算符来转换和统一数据:
GCSToBigQueryOperator
将 Cloud Storage 中的 holidays.csv 文件提取到您之前创建的 BigQueryholidays_weather
数据集中的新表。DataprocCreateBatchOperator
使用 Dataproc Serverless 创建并运行 PySpark 批量作业。BigQueryInsertJobOperator
将 holidays.csv 中的数据与 BigQuery 公共数据集 ghcn_d 中的天气数据联接起来。BigQueryInsertJobOperator
任务是使用 for 循环动态生成的,这些任务位于TaskGroup
中,以便在 Airflow 界面的“图表视图”中获得更好的可读性。
使用 Airflow 界面添加变量
在 Airflow 中,变量是一种通用方式,可用于以简单的键值存储区形式存储和检索任意设置或配置。此 DAG 使用 Airflow 变量来存储常见值。如需将它们添加到您的环境中,请执行以下操作:
依次前往管理 > 变量。
添加以下变量:
gcp_project
:您的项目 ID。gcs_bucket
:您之前创建的存储桶的名称(不带gs://
前缀)。gce_region
:您希望运行 Dataproc 作业的区域,该作业需满足 Dataproc Serverless 网络要求。这是您之前启用专用 Google 访问通道的区域。dataproc_service_account
:Cloud Composer 环境的服务账号。您可以在 Cloud Composer 环境的环境配置标签页中找到此服务账号。
将 DAG 上传到您的环境的存储桶
Cloud Composer 会安排位于环境存储桶的 /dags
文件夹中的 DAG。如需使用Google Cloud 控制台上传 DAG,请执行以下操作:
在本地机器上,保存 data_analytics_dag.py。
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击 DAG 文件夹列中的 DAGs 链接。系统会打开您环境的 DAGs 文件夹。
点击上传文件。
在本地机器上选择
data_analytics_dag.py
,然后点击打开。
触发 DAG
在 Cloud Composer 环境中,点击 DAG 标签页。
点击 DAG ID
data_analytics_dag
。点击 Trigger DAG。
等待大约 5 到 10 分钟,直到您看到绿色对勾标记,这表示任务已成功完成。
验证 DAG 是否成功运行
在 Google Cloud 控制台中,前往 BigQuery 页面。
在浏览器面板中,点击您的项目名称。
点击
holidays_weather_joined
。点击预览以查看生成的表格。请注意,值列中的数字以摄氏度为单位,精确到十分之一。
点击
holidays_weather_normalized
。点击预览以查看生成的表格。请注意,“值”列中的数字以摄氏度为单位。
深入了解 Dataproc Serverless(可选)
您可以尝试使用更复杂的 PySpark 数据处理流程来运行此 DAG 的高级版本。请参阅 GitHub 上的数据分析示例的 Dataproc 扩展程序。
清理
删除您为本教程创建的各个资源:
删除您为本教程创建的 Cloud Storage 存储桶。
删除 Cloud Composer 环境,包括手动删除环境的存储桶。
后续步骤
- 在 Google Cloud 使用来自 AWS 的数据中运行数据分析 DAG。
- 在 Azure 中运行数据分析 DAG。