在 Google Cloud 中运行数据分析 DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本教程介绍如何使用 Cloud Composer 创建 Apache Airflow DAG。DAG 会联接 BigQuery 公共数据集和存储在 Cloud Storage 存储桶中的 CSV 文件中的数据,然后运行 Dataproc Serverless 批处理作业来处理联接后的数据。

本教程中的 BigQuery 公共数据集是 ghcn_d:一个收集全球气候变化概况的综合数据库 地球。CSV 文件包含 1997 年至 2021 年美国节日的日期和名称的相关信息。

我们想通过该 DAG 回答以下问题:“芝加哥的天气有多暖 感恩节怎么办?”

目标

  • 在默认配置中创建 Cloud Composer 环境
  • 创建一个空的 BigQuery 数据集
  • 创建新的 Cloud Storage 存储桶
  • 创建和运行包含以下任务的 DAG:
    • 将外部数据集从 Cloud Storage 加载到 BigQuery
    • 在 BigQuery 中联接两个数据集
    • 运行数据分析 PySpark 作业

准备工作

启用 API

启用以下 API:

控制台

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

授予权限

向您的用户账号授予以下角色和权限:

创建并准备 Cloud Composer 环境

  1. 创建一个 Cloud Composer 环境,默认环境 参数:

  2. 将以下角色授予 Cloud Composer 环境,让 Airflow 工作器 成功运行 DAG 任务:

    • BigQuery 用户 (roles/bigquery.user)
    • BigQuery Data Owner (roles/bigquery.dataOwner)
    • Service Account User (roles/iam.serviceAccountUser)
    • Dataproc Editor (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. 使用以下参数创建一个空的 BigQuery 数据集

    • 名称holiday_weather
    • 区域US
  2. 创建新的 Cloud Storage 存储桶 US 多区域位置。

  3. 运行以下命令,在您要运行 Dataproc Serverless 的区域的默认子网中启用专用 Google 访问通道,以满足网络要求。周三 建议您使用 Cloud Composer 所在的区域 环境

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

使用 Dataproc Serverless 处理数据处理

探索示例 PySpark 作业

以下代码是一个 PySpark 作业示例,它会将温度从 从摄氏度到摄氏度。此作业会将数据集中的温度数据转换为其他格式。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

将支持文件上传到 Cloud Storage

如需上传存储在 holidays.csv 中的 PySpark 文件和数据集,请执行以下操作:

  1. data_analytics_process.py 保存到本地机器。

  2. holidays.csv 保存到本地机器。

  3. 在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:

    转到 Cloud Storage 浏览器

  4. 点击您之前创建的存储桶的名称。

  5. 在存储桶的对象标签页中,点击上传文件按钮。 选择对话框中的data_analytics_process.pyholidays.csv 出现,然后点击打开

数据分析 DAG

探索示例 DAG

DAG 使用多个运算符来转换和统一数据:

  • 通过 GCSToBigQueryOperator 会从以下代码中注入 holidays.csv 文件: 将 Cloud Storage 复制到 BigQuery 中的新表 您之前创建的 holidays_weather 数据集。

  • 通过 DataprocCreateBatchOperator 使用命令行创建并运行 PySpark 批量作业 Dataproc Serverless。

  • BigQueryInsertJobOperator 会将 holidays.csv 中的数据与 BigQuery 公共数据集 ghcn_d 中的数据按“日期”列联接。BigQueryInsertJobOperator 任务 使用 for 循环动态生成,而这些任务位于 TaskGroup 以提高 Airflow 界面的图表视图的可读性。

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "runtime_config": {"version": "1.1"},
    "pyspark_batch": {
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "data_analytics_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region="{{ var.value.gce_region }}",
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )
    # This data is static and it is safe to use WRITE_TRUNCATE
    # to reduce chance of 409 duplicate errors
    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            # BigQuery configs
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # for demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        load_external_dataset >> bq_join_group >> create_batch

使用 Airflow 界面添加变量

在 Airflow 中,变量是一种通用方法,可将任意设置或配置存储和检索为简单的键值对存储。此 DAG 使用 Airflow 变量 存储常用值。如需将其添加到您的环境中,请执行以下操作:

  1. 从 Cloud Composer 控制台访问 Airflow 界面

  2. 转到管理 > 变量

  3. 添加以下变量:

    • gcp_project:您的项目 ID。

    • gcs_bucket:您之前创建的存储桶的名称(不带 gs:// 前缀)。

    • gce_region:您希望满足 Dataproc Serverless 网络要求的 Dataproc 作业的目标区域。这是您之前启用了专用 Google 访问通道的区域。

    • dataproc_service_account:您的 Cloud Composer 环境。您可以找到这项服务 进入您的服务账号的环境配置标签页 Cloud Composer 环境。

将 DAG 上传到您环境的存储桶

Cloud Composer 会安排位于环境存储桶的 /dags 文件夹中的 DAG。要使用 Google Cloud 控制台:

  1. 在本地机器上,保存 data_analytics_dag.py

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表的 DAG 文件夹列中,点击 DAG 链接。系统会打开您环境的 DAG 文件夹。

  4. 点击上传文件

  5. 在本地机器上选择 data_analytics_dag.py,然后点击打开

触发 DAG

  1. 在 Cloud Composer 环境中,点击 DAG 标签页。

  2. 点击进入 DAG ID data_analytics_dag

  3. 点击触发 DAG

  4. 等待大约 5 到 10 分钟,直到您看到绿色对勾,表示任务已成功完成。

验证 DAG 是否成功

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 浏览器面板中,点击您的项目名称。

  3. 点击 holidays_weather_joined

  4. 点击“预览”以查看生成的表格。请注意,值列中的数字以摄氏度为单位,小数点后有一位。

  5. 点击 holidays_weather_normalized

  6. 点击“预览”以查看生成的表格。请注意,值列中的数字以摄氏度为单位。

深入了解 Dataproc Serverless(可选)

您可以尝试使用更复杂的 PySpark 数据处理流程来试用此 DAG 的高级版本。请参阅 GitHub 上的适用于数据分析示例的 Dataproc 扩展

清理

删除您为本教程创建的各个资源:

后续步骤