Cloud Composer 1 | Cloud Composer 2
本教程修改了在 Google Cloud 中运行数据分析 DAG,其中介绍了如何将 Cloud Composer 环境连接到 Amazon Web Services 以利用存储在其中的数据。它展示了如何使用 Cloud Composer 创建 Apache Airflow DAG。DAG 将 BigQuery 公开数据集的数据与存储在 Amazon Web Services (AWS) S3 存储桶中的 CSV 文件联接起来,然后运行 Dataproc 无服务器批量作业来处理联接的数据。
本教程中的 BigQuery 公共数据集是 ghcn_d,这是一个全球气候摘要的集成数据库。该 CSV 文件包含有关 1997 年至 2021 年美国节假日的日期和名称信息。
我们想要使用 DAG 回答的问题是:“过去 25 年中芝加哥的感恩节温度有多高?”
目标
- 使用默认配置创建 Cloud Composer 环境
- 在 AWS S3 中创建存储桶
- 创建空 BigQuery 数据集
- 创建新的 Cloud Storage 存储桶
- 创建并运行包含以下任务的 DAG:
- 将外部数据集从 S3 加载到 Cloud Storage
- 将外部数据集从 Cloud Storage 加载到 BigQuery
- 在 BigQuery 中联接两个数据集
- 运行数据分析 PySpark 作业
准备工作
在 AWS 中管理权限
按照创建 IAM 政策 AWS 教程中的“使用可视化编辑器创建政策”部分,使用以下配置为 AWS S3 创建自定义 IAM 政策:
- 服务:S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
),用于查看 S3 存储桶 - CreateBucket (
s3:CreateBucket
),用于创建存储桶 - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
),用于创建存储桶 - ListBucket (
s3:ListBucket
),用于授予列出 S3 存储桶中的对象的权限 - PutObject (
s3:PutObject
),用于将文件上传到存储桶 - GetBucketVersioning (
s3:GetBucketVersioning
),用于删除存储桶中的对象 - DeleteObject (
s3:DeleteObject
),用于删除存储桶中的对象 - ListBucketVersions (
s3:ListBucketVersions
),用于删除存储桶 - DeleteBucket (
s3:DeleteBucket
),用于删除存储桶 - 资源:选择“存储桶”和“对象”旁边的“不限”,以授予对该类型的任何资源的权限。
- 标记:无
- 名称:TutorialPolicy
如需详细了解上述每种配置,请参阅 Amazon S3 中支持的操作列表。
启用 API
启用以下 API:
控制台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予权限
为您的用户帐号授予以下角色和权限:
授予 BigQuery Data Owner (
roles/bigquery.dataOwner
) 角色以创建 BigQuery 数据集。授予 Storage Admin (
roles/storage.admin
) 角色以创建 Cloud Storage 存储桶。
创建并准备 Cloud Composer 环境
使用默认参数创建一个 Cloud Composer 环境:
- 选择一个美国境内的区域。
- 选择最新的 Cloud Composer 版本。
向 Cloud Composer 环境中使用的服务帐号授予以下角色,以使 Airflow 工作器成功运行 DAG 任务:
- BigQuery 用户 (
roles/bigquery.user
) - BigQuery 数据所有者 (
roles/bigquery.dataOwner
) - Service Account User (
roles/iam.serviceAccountUser
) - Dataproc 编辑者 (
roles/dataproc.editor
) - Dataproc 工作器 (
roles/dataproc.worker
)
- BigQuery 用户 (
在 Google Cloud 中创建和修改相关资源
在 Cloud Composer 环境中安装
apache-airflow-providers-amazon
PyPI 软件包。使用以下参数创建一个空 BigQuery 数据集:
- 名称:
holiday_weather
- 区域:
US
- 名称:
在
US
多区域中创建新的 Cloud Storage 存储桶。运行以下命令,以在要运行 Dataproc Serverless 以满足网络要求的区域中的默认子网上启用专用 Google 访问通道。我们建议您使用与 Cloud Composer 环境相同的区域。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
在 AWS 中创建相关资源
在首选区域中使用默认设置创建一个 S3 存储桶。
从 Cloud Composer 连接到 AWS
- 获取您的 AWS 访问密钥 ID 和私有访问密钥
使用 Airflow 界面添加您的 AWS S3 连接:
- 依次前往管理 > 连接。
使用以下配置创建新连接:
- 连接 ID:
aws_s3_connection
- 连接类型:
Amazon S3
- Extra:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- 连接 ID:
使用 Dataproc Serverless 进行数据处理
探索示例 PySpark 作业
以下代码是一个示例 PySpark 作业,可将温度从摄氏度的十分之一度转换为摄氏度。此作业会将数据集的温度数据转换为其他格式。
将 PySpark 文件上传到 Cloud Storage
要将 PySpark 文件上传到 Cloud Storage,请执行以下操作:
将 data_analytics_process.py 保存到您的本地计算机。
在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:
点击您之前创建的存储桶的名称。
在存储桶的对象标签页中,点击上传文件按钮,在显示的对话框中选择
data_analytics_process.py
,然后点击打开。
将 CSV 文件上传到 AWS S3
如需上传 holidays.csv
文件,请执行以下操作:
- 将
holidays.csv
保存到本地机器上。 - 按照 AWS 指南将文件上传到您的存储桶。
数据分析 DAG
探索示例 DAG
DAG 使用多个运算符来转换和统一数据:
S3ToGCSOperator
将 holidays.csv 文件从您的 AWS S3 存储桶转移到您的 Cloud Storage 存储桶。GCSToBigQueryOperator
将 holidays.csv 文件从 Cloud Storage 提取到您之前创建的 BigQueryholidays_weather
数据集中的新表。DataprocCreateBatchOperator
使用 Dataproc Serverless 创建并运行 PySpark 批量作业。BigQueryInsertJobOperator
将“Date”列的 holidays.csv 数据与 BigQuery 公共数据集 ghcn_d 中的天气数据相联接。BigQueryInsertJobOperator
任务是使用 for 循环动态生成的,并且这些任务位于TaskGroup
中,以便在 Airflow 界面的图表视图中提高可读性。
使用 Airflow 界面添加变量
在 Airflow 中,变量是一种将任意设置或配置作为简单的键值对存储进行存储和检索的通用方式。此 DAG 使用 Airflow 变量存储常用值。如需将其添加到您的环境中,请执行以下操作:
依次前往管理 > 变量。
添加以下变量:
s3_bucket
:您之前创建的 S3 存储桶的名称。gcp_project
:您的项目 ID。gcs_bucket
:您之前创建的存储桶的名称(不带gs://
前缀)。gce_region
:您希望 Dataproc 作业在其中满足 Dataproc 无服务器网络要求的区域。这是您之前启用了专用 Google 访问通道的区域。dataproc_service_account
:您的 Cloud Composer 环境的服务帐号。您可以在 Cloud Composer 环境的“环境配置”标签页中找到此服务帐号。
将该 DAG 上传到环境的存储桶
Cloud Composer 会安排位于环境存储桶的 /dags
文件夹中的 DAG。如需使用 Google Cloud 控制台上传 DAG,请执行以下操作:
在本地计算机上,保存 s3togcsoperator_tutorial.py。
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击 DAG 文件夹列中的 DAG 链接。系统会打开您的环境的 DAGs 文件夹。
点击上传文件。
在本地机器上选择
s3togcsoperator_tutorial.py
,然后点击打开。
触发 DAG
在 Cloud Composer 环境中,点击 DAG 标签页。
点击进入 DAG ID
s3_to_gcs_dag
。点击触发 DAG。
等待大约 5 到 10 分钟,直到显示表示任务已成功完成的绿色对勾标记。
验证 DAG 是否成功
在 Google Cloud 控制台中,转到 BigQuery 页面。
在浏览器面板中,点击您的项目名称。
点击
holidays_weather_joined
。点击“预览”可查看生成的表。请注意,值列中的数字以摄氏度的十分之一为单位。
点击
holidays_weather_normalized
。点击“预览”可查看生成的表。请注意,值列中的数字以摄氏度为单位。
清理
删除您为本教程创建的各个资源:
删除 Cloud Composer 环境,包括手动删除该环境的存储桶。