使用 Azure 中的数据在 Google Cloud 中运行数据分析 DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本教程是对在 Google Cloud 中运行数据分析 DAG 的修改,介绍了如何将 Cloud Composer 环境连接到 Microsoft Azure,以便利用存储在其中的数据。其中说明了如何 使用 Cloud Composer 创建 Apache Airflow DAG。通过 DAG 将 BigQuery 公共数据集中的数据与存储的 CSV 文件中的数据相联接 以 Azure Blob 存储 然后运行 Dataproc Serverless 批处理作业 数据。

本教程中的 BigQuery 公共数据集是 ghcn_d:一个收集全球气候变化概况的综合数据库 地球。CSV 文件包含 1997 年至 2021 年美国节日的日期和名称的相关信息。

我们想通过该 DAG 回答以下问题:“芝加哥的天气有多暖 感恩节怎么办?”

目标

  • 使用默认配置创建 Cloud Composer 环境
  • 在 Azure 中创建 blob
  • 创建一个空的 BigQuery 数据集
  • 创建新的 Cloud Storage 存储桶
  • 创建和运行包含以下任务的 DAG:
    • 将外部数据集从 Azure Blob Storage 加载到 Cloud Storage
    • 将外部数据集从 Cloud Storage 加载到 BigQuery
    • 在 BigQuery 中联接两个数据集
    • 运行数据分析 PySpark 作业

准备工作

启用 API

启用以下 API:

控制台

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

授予权限

向您的用户账号授予以下角色和权限:

创建并准备 Cloud Composer 环境

  1. 创建一个 Cloud Composer 环境,默认环境 参数:

  2. 将以下角色授予 Cloud Composer 环境,让 Airflow 工作器 成功运行 DAG 任务:

    • BigQuery User (roles/bigquery.user)
    • BigQuery Data Owner (roles/bigquery.dataOwner)
    • Service Account User (roles/iam.serviceAccountUser)
    • Dataproc Editor (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. 在 Cloud Composer 环境中安装 apache-airflow-providers-microsoft-azure PyPI 软件包

  2. 创建空 BigQuery 数据集 参数如下:

    • 名称holiday_weather
    • 区域US
  3. US 多区域中创建新的 Cloud Storage 存储桶

  4. 运行以下命令 启用专用 Google 访问通道 您要运行的区域中的默认子网上 Dataproc Serverless 可满足的要求 网络要求。我们建议您使用与 Cloud Composer 环境相同的区域。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    
  1. 使用默认设置创建存储账号

  2. 获取访问密钥和连接字符串 存储数据

  3. 在您新创建的存储空间账号中使用默认选项创建容器

  4. 授予 Storage Blob Delegator 您在上一步中创建的容器拥有此角色。

  5. 上传 holidays.csv,以便在 Azure 门户中使用默认选项创建一个分块 blob

  6. 在 Azure 门户中,为您在上一步中创建的块 blob 创建 SAS 令牌

    • 签名方法:用户委托密钥
    • 权限:读取
    • 允许使用的 IP 地址:无
    • 允许的协议:仅限 HTTPS

从 Cloud Composer 连接到 Azure

添加 Microsoft Azure 连接 使用 Airflow 界面执行以下操作:

  1. 依次选择管理 > 关联

  2. 使用以下配置创建新连接:

    • 连接 IDazure_blob_connection
    • 连接类型Azure Blob Storage
    • Blob Storage 登录信息:您的存储账号名称
    • Blob Storage 密钥:存储账号的访问密钥
    • Blob Storage Account Connection String:您的存储账号 连接字符串
    • SAS 令牌:从您的 blob 生成的 SAS 令牌

使用 Dataproc Serverless 处理数据处理

探索示例 PySpark 作业

以下代码是一个 PySpark 作业示例,用于将温度从十分之一摄氏度转换为摄氏度。此作业会将数据集中的温度数据转换为其他格式。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

将 PySpark 文件上传到 Cloud Storage

如需将 PySpark 文件上传到 Cloud Storage,请执行以下操作:

  1. 保存 data_analytics_process.py 您的本地机器

  2. 在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:

    转到 Cloud Storage 浏览器

  3. 点击您之前创建的存储桶的名称。

  4. 在存储桶的对象标签页中,点击上传文件按钮。 在显示的对话框中选择 data_analytics_process.py,然后点击 Open

数据分析 DAG

探索示例 DAG

DAG 使用多个运算符来转换和统一数据:

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import (
    AzureBlobStorageToGCSOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# Azure configs
AZURE_BLOB_NAME = "{{var.value.azure_blob_name}}"
AZURE_CONTAINER_NAME = "{{var.value.azure_container_name}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "azure_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    azure_blob_to_gcs = AzureBlobStorageToGCSOperator(
        task_id="azure_blob_to_gcs",
        # Azure args
        blob_name=AZURE_BLOB_NAME,
        container_name=AZURE_CONTAINER_NAME,
        wasb_conn_id="azure_blob_connection",
        filename=f"https://console.cloud.google.com/storage/browser/{BUCKET_NAME}/",
        # GCP args
        gcp_conn_id="google_cloud_default",
        object_name="holidays.csv",
        bucket_name=BUCKET_NAME,
        gzip=False,
        impersonation_chain=None,
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        azure_blob_to_gcs >> load_external_dataset >> bq_join_group >> create_batch

使用 Airflow 界面添加变量

在 Airflow 中,变量是一种通用方法,可将任意设置或配置存储和检索为简单的键值对存储。此 DAG 使用 Airflow 变量来存储常见值。如需将它们添加到您的环境中,请执行以下操作:

  1. 从 Cloud Composer 控制台访问 Airflow 界面

  2. 依次前往管理 > 变量

  3. 添加以下变量:

    • gcp_project:您的项目 ID。

    • gcs_bucket:您之前创建的存储桶的名称(不带 gs:// 前缀)。

    • gce_region:您希望满足 Dataproc Serverless 网络要求的 Dataproc 作业的目标区域。这是您之前启用了专用 Google 访问通道的区域。

    • dataproc_service_account:您的 Cloud Composer 环境。您可以在 Cloud Composer 环境的“环境配置”标签页中找到此服务账号。

    • azure_blob_name:您之前创建的 blob 的名称。

    • azure_container_name:您之前创建的容器的名称。

将 DAG 上传到您环境的存储桶

Cloud Composer 会安排 /dags 文件夹。如需使用 Google Cloud 控制台上传 DAG,请执行以下操作:

  1. 在本地机器上,保存 azureblobstoretogcsoperator_tutorial.py

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表的 DAG 文件夹列中,点击 DAG 链接。系统会打开您的环境的 DAGs 文件夹。

  4. 点击上传文件

  5. 在本地机器上选择 azureblobstoretogcsoperator_tutorial.py,然后 点击打开

触发 DAG

  1. 在您的 Cloud Composer 环境中,点击 DAG 标签页。

  2. 点击 DAG ID azure_blob_to_gcs_dag

  3. 点击触发 DAG

  4. 等待大约 5 到 10 分钟,直到出现绿色对勾标记,表明 任务已成功完成。

验证 DAG 是否成功

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 浏览器面板中,点击您的项目名称。

  3. 点击 holidays_weather_joined

  4. 点击“预览”可查看生成的表。请注意, 值列的单位为十分之一摄氏度。

  5. 点击 holidays_weather_normalized

  6. 点击“预览”可查看生成的表。请注意, 值列的单位是摄氏度。

清理

删除您为本教程创建的各个资源:

后续步骤