Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本教程是对在 Google Cloud 中运行数据分析 DAG 的修改,介绍了如何将 Cloud Composer 环境连接到 Amazon Web Services 以利用其中存储的数据。其中说明了如何 使用 Cloud Composer 创建 Apache Airflow DAG。DAG 会联接 BigQuery 公开数据集和存储在 Amazon Web Services (AWS) S3 存储桶中的 CSV 文件中的数据,然后运行 Dataproc Serverless 批处理作业来处理联接后的数据。
本教程中的 BigQuery 公共数据集是 ghcn_d:一个收集全球气候变化概况的综合数据库 地球。CSV 文件包含相关信息 1997 年至 2021 年期间美国节假日的日期和名称。
我们想通过该 DAG 回答以下问题:“芝加哥的天气有多暖 感恩节怎么办?”
目标
- 使用默认配置创建 Cloud Composer 环境
- 在 AWS S3 中创建存储桶
- 创建空 BigQuery 数据集
- 创建新的 Cloud Storage 存储桶
- 创建和运行包含以下任务的 DAG:
- 将外部数据集从 S3 加载到 Cloud Storage
- 将外部数据集从 Cloud Storage 加载到 BigQuery
- 在 BigQuery 中联接两个数据集
- 运行数据分析 PySpark 作业
准备工作
在 AWS 中管理权限
按照“创建 IAM 政策”AWS 教程的“使用可视化编辑器创建政策”部分中的说明,使用以下配置为 AWS S3 创建自定义 IAM 政策:
- 服务:S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
),用于查看 S3 存储桶 - CreateBucket (
s3:CreateBucket
),用于创建存储桶 - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
),用于创建存储桶 - ListBucket (
s3:ListBucket
),用于授予列出 S3 存储桶中对象的权限 - PutObject (
s3:PutObject
),用于向存储桶上传文件 - GetBucketVersioning (
s3:GetBucketVersioning
),用于删除存储桶中的对象 - DeleteObject (
s3:DeleteObject
),用于删除存储桶中的对象 - ListBucketVersions (
s3:ListBucketVersions
),用于删除存储桶 - DeleteBucket (
s3:DeleteBucket
),用于删除存储桶 - 资源:选择“存储桶”和“对象”旁边的“任意”,即可向该类型的任何资源授予权限。
- 标记:无
- 名称:TutorialPolicy
如需详细了解上述每个配置,请参阅 Amazon S3 支持的操作列表。
启用 API
启用以下 API:
控制台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予权限
向您的用户账号授予以下角色和权限:
授予 BigQuery Data Owner (
roles/bigquery.dataOwner
) 角色以创建 BigQuery 数据集。授予 Storage Admin (
roles/storage.admin
) 角色,以创建 Cloud Storage 存储桶。
创建并准备 Cloud Composer 环境
使用默认参数创建 Cloud Composer 环境:
- 选择美国境内的某个区域。
- 选择最新的 Cloud Composer 版本。
向 Cloud Composer 环境中使用的服务账号授予以下角色,以便 Airflow 工作器能够成功运行 DAG 任务:
- BigQuery User (
roles/bigquery.user
) - BigQuery 数据所有者 (
roles/bigquery.dataOwner
) - Service Account User (
roles/iam.serviceAccountUser
) - Dataproc Editor (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- BigQuery User (
在 Google Cloud 中创建和修改相关资源
在 Cloud Composer 环境中安装
apache-airflow-providers-amazon
PyPI 软件包。创建空 BigQuery 数据集 参数如下:
- 名称:
holiday_weather
- 区域:
US
- 名称:
创建新的 Cloud Storage 存储桶
US
多区域位置。运行以下命令 启用专用 Google 访问通道 您要运行的区域中的默认子网上 Dataproc Serverless 可满足的要求 网络要求。我们建议您使用与 Cloud Composer 环境相同的区域。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
在 AWS 中创建相关资源
创建 S3 存储桶 使用您首选区域的默认设置。
从 Cloud Composer 连接到 AWS
- 获取您的 AWS 访问密钥 ID 和私有访问密钥
使用 Airflow 界面添加 AWS S3 连接:
- 依次选择管理 > 关联。
使用以下配置创建新连接:
- 连接 ID:
aws_s3_connection
- 连接类型:
Amazon S3
- Extra:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- 连接 ID:
使用 Dataproc Serverless 进行数据处理
探索 PySpark 作业示例
以下代码是一个 PySpark 作业示例,用于将温度从十分之一摄氏度转换为摄氏度。此招聘信息将 转换为不同的格式。
将 PySpark 文件上传到 Cloud Storage
如需将 PySpark 文件上传到 Cloud Storage,请执行以下操作:
将 data_analytics_process.py 保存到本地机器。
在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:
点击您之前创建的存储桶的名称。
在相应存储桶的对象标签页中,点击上传文件按钮,在随即显示的对话框中选择
data_analytics_process.py
,然后点击打开。
将 CSV 文件上传到 AWS S3
如需上传 holidays.csv
文件,请执行以下操作:
- 将
holidays.csv
保存在本地计算机上。 - 按照 AWS 指南 将该文件上传到您的存储桶。
数据分析 DAG
探索示例 DAG
DAG 使用多个运算符来转换和统一数据:
通过
S3ToGCSOperator
会将 holidays.csv 文件从 将 AWS S3 存储桶复制到您的 Cloud Storage 存储桶。通过
GCSToBigQueryOperator
会从以下代码中注入 holidays.csv 文件: 将 Cloud Storage 复制到 BigQuery 中的新表 您之前创建的holidays_weather
数据集。通过
DataprocCreateBatchOperator
使用命令行创建并运行 PySpark 批量作业 Dataproc Serverless。BigQueryInsertJobOperator
会将 holidays.csv 中的数据与 BigQuery 公共数据集 ghcn_d 中的数据按“日期”列联接。BigQueryInsertJobOperator
任务 使用 for 循环动态生成,而这些任务位于TaskGroup
以提高 Airflow 界面的图表视图的可读性。
使用 Airflow 界面添加变量
在 Airflow 中 variables 是一种用于存储和检索任意设置 存储为简单的键值对此 DAG 使用 Airflow 变量 存储常用值。如需将其添加到您的环境中,请执行以下操作:
依次前往管理 > 变量。
添加以下变量:
s3_bucket
:您之前创建的 S3 存储桶的名称。gcp_project
:您的项目 ID。gcs_bucket
:您之前创建的存储桶的名称(不带gs://
前缀)。gce_region
:您希望 Dataproc 作业, Dataproc 无服务器网络要求。 这是您之前启用了专用 Google 访问通道的区域。dataproc_service_account
:您的 Cloud Composer 环境。您可以在 Cloud Composer 环境的“环境配置”标签页中找到此服务账号。
将 DAG 上传到您环境的存储桶
Cloud Composer 会安排位于环境存储桶的 /dags
文件夹中的 DAG。如需使用 Google Cloud 控制台上传 DAG,请执行以下操作:
在本地机器上,保存 s3togcsoperator_tutorial.py。
在 Google Cloud 控制台中,前往环境页面。
在环境列表中的 DAG 文件夹列中,点击 DAG 链接。系统会打开您的环境的 DAGs 文件夹。
点击上传文件。
在本地机器上选择
s3togcsoperator_tutorial.py
,然后点击打开。
触发 DAG
在您的 Cloud Composer 环境中,点击 DAG 标签页。
点击 DAG ID
s3_to_gcs_dag
。点击触发 DAG。
等待大约 5 到 10 分钟,直到您看到绿色对勾,表示任务已成功完成。
验证 DAG 是否成功
在 Google Cloud 控制台中,转到 BigQuery 页面。
在浏览器面板中,点击您的项目名称。
点击
holidays_weather_joined
。点击“预览”可查看生成的表。请注意, 值列的单位为十分之一摄氏度。
点击
holidays_weather_normalized
。点击“预览”可查看生成的表。请注意, 值列的单位是摄氏度。
清理
删除您为本教程创建的各个资源:
删除
holidays.csv
文件 存储在您的 AWS S3 存储桶中删除 AWS S3 存储桶 创建的所有文件
删除 Cloud Composer 环境,包括 手动删除环境的存储桶