使用来自 Azure 的数据在 Google Cloud 中运行数据分析 DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本教程是 在 Google Cloud 中运行数据分析 DAG,其中介绍了如何将 Cloud Composer 环境连接到 Microsoft Azure 以使用其中存储的数据。其中说明了如何 使用 Cloud Composer 创建 Apache Airflow DAG。通过 DAG 将 BigQuery 公共数据集中的数据与存储的 CSV 文件中的数据相联接 以 Azure Blob 存储 然后运行 Dataproc Serverless 批处理作业 数据。

本教程中的 BigQuery 公共数据集是 ghcn_d,它是一个集成的全球气候摘要数据库。CSV 文件包含 1997 年至 2021 年美国节日的日期和名称的相关信息。

我们想通过该 DAG 回答以下问题:“芝加哥的天气有多暖 感恩节怎么办?”

目标

  • 使用默认配置创建 Cloud Composer 环境
  • 在 Azure 中创建 blob
  • 创建空 BigQuery 数据集
  • 创建新的 Cloud Storage 存储桶
  • 创建和运行包含以下任务的 DAG:
    • 将外部数据集从 Azure Blob Storage 加载到 Cloud Storage
    • 将外部数据集从 Cloud Storage 加载到 BigQuery
    • 在 BigQuery 中联接两个数据集
    • 运行数据分析 PySpark 作业

准备工作

启用 API

启用以下 API:

控制台

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

授予权限

向您的用户账号授予以下角色和权限:

创建并准备 Cloud Composer 环境

  1. 使用默认参数创建 Cloud Composer 环境

  2. 将以下角色授予 Cloud Composer 环境,让 Airflow 工作器 成功运行 DAG 任务:

    • BigQuery 用户 (roles/bigquery.user)
    • BigQuery Data Owner (roles/bigquery.dataOwner)
    • Service Account User (roles/iam.serviceAccountUser)
    • Dataproc Editor (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. 安装 apache-airflow-providers-microsoft-azure PyPI 软件包 Cloud Composer 环境。

  2. 创建空 BigQuery 数据集 参数如下:

    • 名称holiday_weather
    • 区域US
  3. 创建新的 Cloud Storage 存储桶 US 多区域位置。

  4. 运行以下命令,在您要运行 Dataproc Serverless 的区域的默认子网中启用专用 Google 访问通道,以满足网络要求。我们建议您使用与 Cloud Composer 环境相同的区域。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    
  1. 使用默认设置创建存储账号

  2. 获取存储空间账号的访问密钥和连接字符串

  3. 创建容器 默认选项。

  4. 为在上一步中创建的容器授予 Storage Blob Delegator 角色。

  5. 上传 holidays.csv,以便在 Azure 门户中使用默认选项创建一个分块 blob

  6. 在 Azure 门户中,为您在上一步中创建的块 blob 创建 SAS 令牌

    • 签名方法:用户委托密钥
    • 权限:读取
    • 允许的 IP 地址:无
    • 允许的协议:仅限 HTTPS

从 Cloud Composer 连接到 Azure

添加 Microsoft Azure 连接 使用 Airflow 界面执行以下操作:

  1. 转到管理 > 连接

  2. 使用以下配置创建新连接:

    • 连接 IDazure_blob_connection
    • 连接类型Azure Blob Storage
    • Blob Storage Login:您的存储账号名称
    • Blob Storage Key:存储账号的访问密钥
    • Blob Storage Account Connection String:您的存储账号 连接字符串
    • SAS 令牌:从您的 blob 生成的 SAS 令牌

使用 Dataproc Serverless 处理数据处理

探索 PySpark 作业示例

以下代码是一个 PySpark 作业示例,它会将温度从 从摄氏度到摄氏度。此招聘信息将 转换为不同的格式。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

将 PySpark 文件上传到 Cloud Storage

如需将 PySpark 文件上传到 Cloud Storage,请执行以下操作:

  1. data_analytics_process.py 保存到本地机器。

  2. 在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:

    转到 Cloud Storage 浏览器

  3. 点击您之前创建的存储桶的名称。

  4. 在相应存储桶的对象标签页中,点击上传文件按钮,在随即显示的对话框中选择 data_analytics_process.py,然后点击打开

数据分析 DAG

探索示例 DAG

DAG 使用多个运算符来转换和统一数据:

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import (
    AzureBlobStorageToGCSOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# Azure configs
AZURE_BLOB_NAME = "{{var.value.azure_blob_name}}"
AZURE_CONTAINER_NAME = "{{var.value.azure_container_name}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "azure_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    azure_blob_to_gcs = AzureBlobStorageToGCSOperator(
        task_id="azure_blob_to_gcs",
        # Azure args
        blob_name=AZURE_BLOB_NAME,
        container_name=AZURE_CONTAINER_NAME,
        wasb_conn_id="azure_blob_connection",
        filename=f"https://console.cloud.google.com/storage/browser/{BUCKET_NAME}/",
        # GCP args
        gcp_conn_id="google_cloud_default",
        object_name="holidays.csv",
        bucket_name=BUCKET_NAME,
        gzip=False,
        impersonation_chain=None,
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        azure_blob_to_gcs >> load_external_dataset >> bq_join_group >> create_batch

使用 Airflow 界面添加变量

在 Airflow 中 variables 是一种用于存储和检索任意设置 存储为简单的键值对此 DAG 使用 Airflow 变量来存储常见值。如需将它们添加到您的环境中,请执行以下操作:

  1. 从 Cloud Composer 控制台访问 Airflow 界面

  2. 依次前往管理 > 变量

  3. 添加以下变量:

    • gcp_project:您的项目 ID。

    • gcs_bucket:您之前创建的存储桶的名称(不带 gs:// 前缀)。

    • gce_region:您希望 Dataproc 作业, Dataproc 无服务器网络要求。 这是您之前启用专用 Google 访问通道的区域。

    • dataproc_service_account:Cloud Composer 环境的服务账号。您可以找到这项服务 进入您的服务账号的环境配置标签页 Cloud Composer 环境。

    • azure_blob_name:您之前创建的 blob 的名称。

    • azure_container_name:您之前创建的容器的名称。

将该 DAG 上传到环境的存储桶

Cloud Composer 会安排 /dags 文件夹。要使用 Google Cloud 控制台:

  1. 在本地机器上,保存 azureblobstoretogcsoperator_tutorial.py

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表的 DAG 文件夹列中,点击 DAG 链接。系统会打开您的环境的 DAGs 文件夹。

  4. 点击上传文件

  5. 在本地机器上选择 azureblobstoretogcsoperator_tutorial.py,然后 点击打开

触发 DAG

  1. 在您的 Cloud Composer 环境中,点击 DAG 标签页。

  2. 点击进入 DAG ID azure_blob_to_gcs_dag

  3. 点击触发 DAG

  4. 等待大约 5 到 10 分钟,直到出现绿色对勾标记,表明 任务已成功完成。

验证 DAG 是否成功

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 浏览器面板中,点击您的项目名称。

  3. 点击 holidays_weather_joined

  4. 点击“预览”以查看生成的表格。请注意,值列中的数字以摄氏度为单位,小数点后有两位数。

  5. 点击 holidays_weather_normalized

  6. 点击“预览”以查看生成的表格。请注意,值列中的数字以摄氏度为单位。

清理

删除您为本教程创建的各个资源:

后续步骤