Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本教程是对在 Google Cloud 中运行数据分析 DAG 的修改,介绍了如何将 Cloud Composer 环境连接到 Microsoft Azure,以便利用存储在其中的数据。其中说明了如何 使用 Cloud Composer 创建 Apache Airflow DAG。通过 DAG 将 BigQuery 公共数据集中的数据与存储的 CSV 文件中的数据相联接 以 Azure Blob 存储 然后运行 Dataproc Serverless 批处理作业 数据。
本教程中的 BigQuery 公共数据集是 ghcn_d:一个收集全球气候变化概况的综合数据库 地球。CSV 文件包含 1997 年至 2021 年美国节日的日期和名称的相关信息。
我们想通过该 DAG 回答以下问题:“芝加哥的天气有多暖 感恩节怎么办?”
目标
- 使用默认配置创建 Cloud Composer 环境
- 在 Azure 中创建 blob
- 创建一个空的 BigQuery 数据集
- 创建新的 Cloud Storage 存储桶
- 创建和运行包含以下任务的 DAG:
- 将外部数据集从 Azure Blob Storage 加载到 Cloud Storage
- 将外部数据集从 Cloud Storage 加载到 BigQuery
- 在 BigQuery 中联接两个数据集
- 运行数据分析 PySpark 作业
准备工作
启用 API
启用以下 API:
控制台
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
授予权限
向您的用户账号授予以下角色和权限:
授予 BigQuery Data Owner (
roles/bigquery.dataOwner
) 角色以创建 BigQuery 数据集。授予 Storage Admin (
roles/storage.admin
) 角色,以创建 Cloud Storage 存储桶。
创建并准备 Cloud Composer 环境
创建一个 Cloud Composer 环境,默认环境 参数:
- 选择美国境内的某个区域。
- 选择最新的 Cloud Composer 版本。
将以下角色授予 Cloud Composer 环境,让 Airflow 工作器 成功运行 DAG 任务:
- BigQuery User (
roles/bigquery.user
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - Service Account User (
roles/iam.serviceAccountUser
) - Dataproc Editor (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- BigQuery User (
在 Google Cloud 中创建和修改相关资源
在 Cloud Composer 环境中安装
apache-airflow-providers-microsoft-azure
PyPI 软件包。创建空 BigQuery 数据集 参数如下:
- 名称:
holiday_weather
- 区域:
US
- 名称:
在
US
多区域中创建新的 Cloud Storage 存储桶。运行以下命令 启用专用 Google 访问通道 您要运行的区域中的默认子网上 Dataproc Serverless 可满足的要求 网络要求。我们建议您使用与 Cloud Composer 环境相同的区域。
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
在 Azure 中创建相关资源
使用默认设置创建存储账号。
获取访问密钥和连接字符串 存储数据
在您新创建的存储空间账号中使用默认选项创建容器。
授予 Storage Blob Delegator 您在上一步中创建的容器拥有此角色。
上传 holidays.csv,以便在 Azure 门户中使用默认选项创建一个分块 blob。
在 Azure 门户中,为您在上一步中创建的块 blob 创建 SAS 令牌。
- 签名方法:用户委托密钥
- 权限:读取
- 允许使用的 IP 地址:无
- 允许的协议:仅限 HTTPS
从 Cloud Composer 连接到 Azure
添加 Microsoft Azure 连接 使用 Airflow 界面执行以下操作:
依次选择管理 > 关联。
使用以下配置创建新连接:
- 连接 ID:
azure_blob_connection
- 连接类型:
Azure Blob Storage
- Blob Storage 登录信息:您的存储账号名称
- Blob Storage 密钥:存储账号的访问密钥
- Blob Storage Account Connection String:您的存储账号 连接字符串
- SAS 令牌:从您的 blob 生成的 SAS 令牌
- 连接 ID:
使用 Dataproc Serverless 处理数据处理
探索示例 PySpark 作业
以下代码是一个 PySpark 作业示例,用于将温度从十分之一摄氏度转换为摄氏度。此作业会将数据集中的温度数据转换为其他格式。
将 PySpark 文件上传到 Cloud Storage
如需将 PySpark 文件上传到 Cloud Storage,请执行以下操作:
保存 data_analytics_process.py 您的本地机器
在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:
点击您之前创建的存储桶的名称。
在存储桶的对象标签页中,点击上传文件按钮。 在显示的对话框中选择
data_analytics_process.py
,然后点击 Open。
数据分析 DAG
探索示例 DAG
DAG 使用多个运算符来转换和统一数据:
AzureBlobStorageToGCSOperator
会将 holidays.csv 文件从您的 Azure 块 blob 传输到您的 Cloud Storage 存储桶。GCSToBigQueryOperator
会将 Cloud Storage 中的 holidays.csv 文件提取到您之前创建的 BigQueryholidays_weather
数据集中的新表中。DataprocCreateBatchOperator
使用 Dataproc Serverless 创建和运行 PySpark 批处理作业。BigQueryInsertJobOperator
会将 holidays.csv 中的数据与 BigQuery 公共数据集 ghcn_d 中的数据按“日期”列联接。BigQueryInsertJobOperator
任务是使用 for 循环动态生成的,并且这些任务位于TaskGroup
中,以便在 Airflow 界面的图表视图中更好地阅读。
使用 Airflow 界面添加变量
在 Airflow 中,变量是一种通用方法,可将任意设置或配置存储和检索为简单的键值对存储。此 DAG 使用 Airflow 变量来存储常见值。如需将它们添加到您的环境中,请执行以下操作:
依次前往管理 > 变量。
添加以下变量:
gcp_project
:您的项目 ID。gcs_bucket
:您之前创建的存储桶的名称(不带gs://
前缀)。gce_region
:您希望满足 Dataproc Serverless 网络要求的 Dataproc 作业的目标区域。这是您之前启用了专用 Google 访问通道的区域。dataproc_service_account
:您的 Cloud Composer 环境。您可以在 Cloud Composer 环境的“环境配置”标签页中找到此服务账号。azure_blob_name
:您之前创建的 blob 的名称。azure_container_name
:您之前创建的容器的名称。
将 DAG 上传到您环境的存储桶
Cloud Composer 会安排
/dags
文件夹。如需使用 Google Cloud 控制台上传 DAG,请执行以下操作:
在本地机器上,保存 azureblobstoretogcsoperator_tutorial.py。
在 Google Cloud 控制台中,前往环境页面。
在环境列表的 DAG 文件夹列中,点击 DAG 链接。系统会打开您的环境的 DAGs 文件夹。
点击上传文件。
在本地机器上选择
azureblobstoretogcsoperator_tutorial.py
,然后 点击打开。
触发 DAG
在您的 Cloud Composer 环境中,点击 DAG 标签页。
点击 DAG ID
azure_blob_to_gcs_dag
。点击触发 DAG。
等待大约 5 到 10 分钟,直到出现绿色对勾标记,表明 任务已成功完成。
验证 DAG 是否成功
在 Google Cloud 控制台中,转到 BigQuery 页面。
在浏览器面板中,点击您的项目名称。
点击
holidays_weather_joined
。点击“预览”可查看生成的表。请注意, 值列的单位为十分之一摄氏度。
点击
holidays_weather_normalized
。点击“预览”可查看生成的表。请注意, 值列的单位是摄氏度。
清理
删除您为本教程创建的各个资源:
删除 Cloud Composer 环境,包括 手动删除环境的存储桶