Cloud Composer 1 | Cloud Composer 2
本教程修改了在 Google Cloud 中运行数据分析 DAG,其中介绍了如何将 Cloud Composer 环境连接到 Microsoft Azure 以利用存储在 Microsoft Azure 中的数据。它展示了如何使用 Cloud Composer 创建 Apache Airflow DAG。DAG 将来自 BigQuery 公共数据集和存储在 Azure Blob Storage 中的 CSV 文件的数据联接起来,然后运行 Dataproc 无服务器批量作业来处理联接的数据。
本教程中的 BigQuery 公共数据集是 ghcn_d,这是一个全球气候摘要的集成数据库。该 CSV 文件包含有关 1997 年至 2021 年美国节假日的日期和名称信息。
我们想要使用 DAG 回答的问题是:“过去 25 年中芝加哥的感恩节温度有多高?”
目标
- 使用默认配置创建 Cloud Composer 环境
- 在 Azure 中创建 blob
- 创建空 BigQuery 数据集
- 创建新的 Cloud Storage 存储桶
- 创建并运行包含以下任务的 DAG:
- 将外部数据集从 Azure Blob Storage 加载到 Cloud Storage
- 将外部数据集从 Cloud Storage 加载到 BigQuery
- 在 BigQuery 中联接两个数据集
- 运行数据分析 PySpark 作业
准备工作
启用 API
启用以下 API:
控制台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予权限
为您的用户帐号授予以下角色和权限:
授予 BigQuery Data Owner (
roles/bigquery.dataOwner
) 角色以创建 BigQuery 数据集。授予 Storage Admin (
roles/storage.admin
) 角色以创建 Cloud Storage 存储桶。
创建并准备 Cloud Composer 环境
使用默认参数创建一个 Cloud Composer 环境:
- 选择一个美国境内的区域。
- 选择最新的 Cloud Composer 版本。
向 Cloud Composer 环境中使用的服务帐号授予以下角色,以使 Airflow 工作器成功运行 DAG 任务:
- BigQuery 用户 (
roles/bigquery.user
) - BigQuery 数据所有者 (
roles/bigquery.dataOwner
) - Service Account User (
roles/iam.serviceAccountUser
) - Dataproc 编辑者 (
roles/dataproc.editor
) - Dataproc 工作器 (
roles/dataproc.worker
)
- BigQuery 用户 (
在 Google Cloud 中创建和修改相关资源
在 Cloud Composer 环境中安装
apache-airflow-providers-microsoft-azure
PyPI 软件包。使用以下参数创建一个空 BigQuery 数据集:
- 名称:
holiday_weather
- 区域:
US
- 名称:
在
US
多区域中创建新的 Cloud Storage 存储桶。运行以下命令,以在要运行 Dataproc Serverless 以满足网络要求的区域中的默认子网上启用专用 Google 访问通道。我们建议您使用与 Cloud Composer 环境相同的区域。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
在 Azure 中创建相关资源
使用默认设置创建存储帐号。
为您的存储帐号获取访问密钥和连接字符串。
在新创建的存储帐号中,使用默认选项创建一个容器。
针对上一步中创建的容器授予 Storage Blob Delegator 角色。
上传 holidays.csv 以使用 Azure 门户中的默认选项创建块 blob。
为您在 Azure 门户中创建的块 blob 创建 SAS 令牌。
- 签名方法:用户委托密钥
- 权限:读取
- 允许的 IP 地址:无
- 允许的协议:仅限 HTTPS
从 Cloud Composer 连接到 Azure
使用 Airflow 界面添加 Microsoft Azure 连接:
依次前往管理 > 连接。
使用以下配置创建新连接:
- 连接 ID:
azure_blob_connection
- 连接类型:
Azure Blob Storage
- Blob Storage Login:您的存储帐号名称
- Blob Storage Key:存储帐号的访问密钥
- Blob Storage Account Connection String:您的存储帐号连接字符串
- SAS 令牌:从您的 blob 生成的 SAS 令牌
- 连接 ID:
使用 Dataproc Serverless 进行数据处理
探索示例 PySpark 作业
以下代码是一个示例 PySpark 作业,可将温度从摄氏度的十分之一度转换为摄氏度。此作业会将数据集的温度数据转换为其他格式。
将 PySpark 文件上传到 Cloud Storage
要将 PySpark 文件上传到 Cloud Storage,请执行以下操作:
将 data_analytics_process.py 保存到您的本地计算机。
在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:
点击您之前创建的存储桶的名称。
在存储桶的对象标签页中,点击上传文件按钮,在显示的对话框中选择
data_analytics_process.py
,然后点击打开。
数据分析 DAG
探索示例 DAG
DAG 使用多个运算符来转换和统一数据:
AzureBlobStorageToGCSOperator
将 holidays.csv 文件从 Azure 块 blob 转移到 Cloud Storage 存储桶。GCSToBigQueryOperator
将 holidays.csv 文件从 Cloud Storage 提取到您之前创建的 BigQueryholidays_weather
数据集中的新表。DataprocCreateBatchOperator
使用 Dataproc Serverless 创建并运行 PySpark 批量作业。BigQueryInsertJobOperator
将“Date”列的 holidays.csv 数据与 BigQuery 公共数据集 ghcn_d 中的天气数据相联接。BigQueryInsertJobOperator
任务是使用 for 循环动态生成的,并且这些任务位于TaskGroup
中,以便在 Airflow 界面的图表视图中提高可读性。
使用 Airflow 界面添加变量
在 Airflow 中,变量是一种将任意设置或配置作为简单的键值对存储进行存储和检索的通用方式。此 DAG 使用 Airflow 变量存储常用值。如需将其添加到您的环境中,请执行以下操作:
依次前往管理 > 变量。
添加以下变量:
gcp_project
:您的项目 ID。gcs_bucket
:您之前创建的存储桶的名称(不带gs://
前缀)。gce_region
:您希望 Dataproc 作业在其中满足 Dataproc 无服务器网络要求的区域。这是您之前启用了专用 Google 访问通道的区域。dataproc_service_account
:您的 Cloud Composer 环境的服务帐号。您可以在 Cloud Composer 环境的“环境配置”标签页中找到此服务帐号。azure_blob_name
:您之前创建的 blob 的名称。azure_container_name
:您之前创建的容器的名称。
将该 DAG 上传到环境的存储桶
Cloud Composer 会安排位于环境存储桶的 /dags
文件夹中的 DAG。如需使用 Google Cloud 控制台上传 DAG,请执行以下操作:
在本地机器上,保存 azureblobstoretogcsoperator_tutorial.py。
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击 DAG 文件夹列中的 DAG 链接。系统会打开您的环境的 DAGs 文件夹。
点击上传文件。
在本地机器上选择
azureblobstoretogcsoperator_tutorial.py
,然后点击打开。
触发 DAG
在 Cloud Composer 环境中,点击 DAG 标签页。
点击进入 DAG ID
azure_blob_to_gcs_dag
。点击触发 DAG。
等待大约 5 到 10 分钟,直到显示表示任务已成功完成的绿色对勾标记。
验证 DAG 是否成功
在 Google Cloud 控制台中,转到 BigQuery 页面。
在浏览器面板中,点击您的项目名称。
点击
holidays_weather_joined
。点击“预览”可查看生成的表。请注意,值列中的数字以摄氏度的十分之一为单位。
点击
holidays_weather_normalized
。点击“预览”可查看生成的表。请注意,值列中的数字以摄氏度为单位。
清理
删除您为本教程创建的各个资源:
删除 Cloud Composer 环境,包括手动删除该环境的存储桶。