在 Google Cloud 中執行資料分析 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本教學課程說明如何使用 Cloud Composer 建立 Apache Airflow DAG。DAG 會合併 BigQuery 公開資料集和儲存在 Cloud Storage 值區中的 CSV 檔案資料,然後執行 Dataproc Serverless 批次工作,處理合併後的資料。

本教學課程使用的 BigQuery 公開資料集是 ghcn_d,這是全球氣候摘要的整合資料庫。CSV 檔案包含 1997 年至 2021 年美國節日的日期和名稱。

我們想透過 DAG 回答的問題是:「過去 25 年來,芝加哥在感恩節當天的氣溫如何?」

目標

  • 以預設設定建立 Cloud Composer 環境
  • 建立空白的 BigQuery 資料集
  • 建立新的 Cloud Storage bucket
  • 建立並執行包含下列工作的 DAG:
    • 將外部資料集從 Cloud Storage 載入 BigQuery
    • 在 BigQuery 中聯結兩個資料集
    • 執行資料分析 PySpark 工作

事前準備

啟用 API

啟用下列 API:

主控台

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

授予權限

將下列角色和權限授予使用者帳戶:

建立及準備 Cloud Composer 環境

  1. 建立 Cloud Composer 環境,並使用預設參數:

  2. 將下列角色授予 Cloud Composer 環境中使用的服務帳戶,Airflow 工作者才能順利執行 DAG 工作:

    • BigQuery 使用者 (roles/bigquery.user)
    • BigQuery 資料擁有者 (roles/bigquery.dataOwner)
    • 服務帳戶使用者 (roles/iam.serviceAccountUser)
    • Dataproc 編輯器 (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. 建立空白的 BigQuery 資料集,並使用下列參數:

    • Name (名稱):holiday_weather
    • Region (區域):US
  2. US 多區域中建立新的 Cloud Storage 值區

  3. 執行下列指令,在要執行 Dataproc Serverless 的區域中,對預設子網路啟用私人 Google 存取權,以滿足網路需求。建議使用與 Cloud Composer 環境相同的區域。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

使用 Dataproc Serverless 處理資料

探索 PySpark 工作範例

以下程式碼是 PySpark 工作範例,可將攝氏溫度從十分之一度轉換為攝氏度。這項工作會將資料集中的溫度資料轉換為其他格式。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

將支援檔案上傳至 Cloud Storage

如要上傳 PySpark 檔案和儲存在 holidays.csv 中的資料集,請按照下列步驟操作:

  1. data_analytics_process.py 儲存到本機。

  2. holidays.csv 儲存到本機。

  3. 前往 Google Cloud 控制台的「Cloud Storage 瀏覽器」頁面:

    前往 Cloud Storage 瀏覽器

  4. 按一下您稍早建立的 bucket 名稱。

  5. 在值區的「物件」分頁中,按一下「上傳檔案」按鈕,在出現的對話方塊中選取 data_analytics_process.pyholidays.csv,然後按一下「開啟」

資料分析 DAG

探索範例 DAG

DAG 會使用多個運算子轉換及合併資料:

  • GCSToBigQueryOperator 會從 Cloud Storage 將 holidays.csv 檔案擷取到您先前建立的 BigQuery holidays_weather 資料集中的新資料表。

  • DataprocCreateBatchOperator 會使用 Dataproc Serverless 建立及執行 PySpark 批次工作。

  • BigQueryInsertJobOperator 會將 holidays.csv 中的資料,與 BigQuery 公開資料集 ghcn_d 中的氣象資料,依據「Date」欄位進行聯結。BigQueryInsertJobOperator 工作是使用 for 迴圈動態產生,這些工作位於 TaskGroup 中,方便在 Airflow UI 的圖表檢視畫面中閱讀。

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "runtime_config": {"version": "1.1"},
    "pyspark_batch": {
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "data_analytics_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region="{{ var.value.gce_region }}",
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )
    # This data is static and it is safe to use WRITE_TRUNCATE
    # to reduce chance of 409 duplicate errors
    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            # BigQuery configs
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # for demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        load_external_dataset >> bq_join_group >> create_batch

使用 Airflow UI 新增變數

在 Airflow 中,變數是儲存及擷取任意設定或設定的通用方式,可做為簡單的鍵值儲存空間。這個 DAG 會使用 Airflow 變數儲存通用值。如要將這些變數新增至環境,請按照下列步驟操作:

  1. 從 Cloud Composer 控制台存取 Airflow UI

  2. 依序前往「管理」>「變數」

  3. 新增下列變數:

    • gcp_project:您的專案 ID。

    • gcs_bucket:您先前建立的值區名稱 (不含 gs:// 前置字串)。

    • :您要執行 Dataproc 作業的區域,該作業必須符合 Dataproc Serverless 網路需求gce_region這是您先前啟用私人 Google 存取權的區域。

    • dataproc_service_account:Cloud Composer 環境的服務帳戶。您可以在 Cloud Composer 環境的環境設定分頁中找到這個服務帳戶。

將 DAG 上傳至環境的值區

Cloud Composer 會排程環境 bucket 中 /dags 資料夾內的 DAG。如要使用Google Cloud 控制台上傳 DAG,請按照下列步驟操作:

  1. 在本機電腦上儲存 data_analytics_dag.py

  2. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  3. 在環境清單的「DAG folder」(DAG 資料夾) 欄中,按一下「DAGs」(DAG) 連結。系統會開啟環境的 DAG 資料夾。

  4. 按一下「上傳檔案」

  5. 選取本機上的 data_analytics_dag.py,然後按一下「開啟」

觸發 DAG

  1. 在 Cloud Composer 環境中,按一下「DAGs」分頁標籤。

  2. 按一下 DAG ID data_analytics_dag

  3. 按一下「觸發 DAG」

  4. 等待約五到十分鐘,直到看到綠色勾號,表示工作已順利完成。

驗證 DAG 是否成功

  1. 前往 Google Cloud 控制台的「BigQuery」BigQuery頁面。

    前往「BigQuery」

  2. 在「Explorer」面板中,按一下專案名稱。

  3. 按一下「holidays_weather_joined」。

  4. 按一下「預覽」,查看產生的資料表。請注意,「值」欄中的數字是以攝氏溫度十分之一為單位。

  5. 按一下「holidays_weather_normalized」。

  6. 按一下「預覽」,查看產生的資料表。請注意,「值」欄中的數字是以攝氏度為單位。

深入瞭解無伺服器型 Dataproc (選用)

您可以試用這個 DAG 的進階版本,瞭解更複雜的 PySpark 資料處理流程。請參閱 GitHub 上的「Dataproc extension for the Data Analytics Example」。

清除所用資源

刪除您為本教學課程建立的個別資源:

後續步驟