使用来自 AWS 的数据在 Google Cloud 中运行数据分析 DAG

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本教程是 在 Google Cloud 中运行数据分析 DAG 展示了如何将 Cloud Composer 环境连接到 Amazon 网络服务利用其中存储的数据。该教程介绍了如何使用 Cloud Composer 创建 Apache Airflow DAG。通过 DAG 将 BigQuery 公共数据集中的数据与存储的 CSV 文件中的数据相联接 以 Amazon Web Services (AWS) S3 存储桶 然后运行 Dataproc Serverless 批处理作业 数据。

本教程中的 BigQuery 公共数据集是 ghcn_d:一个收集全球气候变化概况的综合数据库 地球。CSV 文件包含 1997 年至 2021 年美国节日的日期和名称的相关信息。

我们希望使用 DAG 回答的问题是:“过去 25 年来,芝加哥在感恩节当天天气有多暖?”

目标

  • 使用默认配置创建 Cloud Composer 环境
  • 在 AWS S3 中创建存储桶
  • 创建空 BigQuery 数据集
  • 创建新的 Cloud Storage 存储桶
  • 创建和运行包含以下任务的 DAG:
    • 将外部数据集从 S3 加载到 Cloud Storage
    • 将外部数据集从 Cloud Storage 加载到 BigQuery
    • 在 BigQuery 中联接两个数据集
    • 运行数据分析 PySpark 作业

准备工作

在 AWS 中管理权限

  1. 创建 AWS 账号

  2. 按照“使用可视化编辑器创建政策”部分中的说明操作的 AWS 创建 IAM 政策教程 使用以下配置为 AWS S3 创建自定义 IAM 政策:

    • 服务:S3
    • ListAllMyBuckets (s3:ListAllMyBuckets),用于查看您的 S3 存储桶
    • CreateBucket (s3:CreateBucket),用于创建存储桶
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls),用于创建存储桶
    • ListBucket (s3:ListBucket),用于授予列出 S3 存储桶中的对象的权限
    • PutObject (s3:PutObject),用于将文件上传到存储桶
    • GetBucketVersioning (s3:GetBucketVersioning),用于删除存储桶中的对象
    • DeleteObject (s3:DeleteObject),用于删除存储桶中的对象
    • ListBucketVersions (s3:ListBucketVersions),用于删除存储桶
    • DeleteBucket (s3:DeleteBucket),用于删除存储桶
    • 资源:选择“存储桶”和“对象”旁边的“任意”,即可向该类型的任何资源授予权限。
    • 代码:无
    • 名称:TutorialPolicy

    请参阅 Amazon S3 中支持的操作的列表

  3. TutorialPolicy IAM 政策添加到您的身份

启用 API

启用以下 API:

控制台

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

授予权限

向您的用户账号授予以下角色和权限:

创建并准备 Cloud Composer 环境

  1. 创建一个 Cloud Composer 环境,默认环境 参数:

  2. 向 Cloud Composer 环境中使用的服务账号授予以下角色,以便 Airflow 工作器能够成功运行 DAG 任务:

    • BigQuery 用户 (roles/bigquery.user)
    • BigQuery Data Owner (roles/bigquery.dataOwner)
    • Service Account User (roles/iam.serviceAccountUser)
    • Dataproc Editor (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. 在 Cloud Composer 环境中安装 apache-airflow-providers-amazon PyPI 软件包

  2. 创建空 BigQuery 数据集 参数如下:

    • 名称holiday_weather
    • 区域US
  3. US 多区域中创建新的 Cloud Storage 存储桶

  4. 运行以下命令 启用专用 Google 访问通道 您要运行的区域中的默认子网上 Dataproc Serverless 可满足的要求 网络要求。我们建议您使用与 Cloud Composer 环境相同的区域。

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

在首选区域中使用默认设置创建 S3 存储桶

从 Cloud Composer 连接到 AWS

  1. 获取您的 AWS 访问密钥 ID 和私有访问密钥
  2. 使用 Airflow 界面添加 AWS S3 连接

    1. 依次选择管理 > 关联
    2. 使用以下配置创建新连接:

      • 连接 IDaws_s3_connection
      • 连接类型Amazon S3
      • Extra{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

使用 Dataproc Serverless 处理数据处理

探索示例 PySpark 作业

以下代码是一个 PySpark 作业示例,它会将温度从 从摄氏度到摄氏度。此作业会将数据集中的温度数据转换为其他格式。

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

将 PySpark 文件上传到 Cloud Storage

如需将 PySpark 文件上传到 Cloud Storage,请执行以下操作:

  1. 保存 data_analytics_process.py 您的本地机器

  2. 在 Google Cloud 控制台中,前往 Cloud Storage 浏览器页面:

    转到 Cloud Storage 浏览器

  3. 点击您之前创建的存储桶的名称。

  4. 在相应存储桶的对象标签页中,点击上传文件按钮,在随即显示的对话框中选择 data_analytics_process.py,然后点击打开

将 CSV 文件上传到 AWS S3

如需上传 holidays.csv 文件,请执行以下操作:

  1. holidays.csv 保存在本地机器上。
  2. 按照 AWS 指南 将该文件上传到您的存储桶。

数据分析 DAG

探索示例 DAG

DAG 使用多个运算符来转换和统一数据:

  • S3ToGCSOperator 会将 holidays.csv 文件从您的 AWS S3 存储桶转移到您的 Cloud Storage 存储桶。

  • 通过 GCSToBigQueryOperator 会从以下代码中注入 holidays.csv 文件: 将 Cloud Storage 复制到 BigQuery 中的新表 您之前创建的 holidays_weather 数据集。

  • 通过 DataprocCreateBatchOperator 使用命令行创建并运行 PySpark 批量作业 Dataproc Serverless。

  • BigQueryInsertJobOperator 会将 holidays.csv 中的数据与 BigQuery 公共数据集 ghcn_d 中的数据按“日期”列联接。BigQueryInsertJobOperator 任务是使用 for 循环动态生成的,并且这些任务位于 TaskGroup 中,以便在 Airflow 界面的图表视图中更好地阅读。

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

使用 Airflow 界面添加变量

在 Airflow 中 variables 是一种用于存储和检索任意设置 存储为简单的键值对此 DAG 使用 Airflow 变量 存储常用值。如需将它们添加到您的环境中,请执行以下操作:

  1. 从 Cloud Composer 控制台访问 Airflow 界面

  2. 转到管理 > 变量

  3. 添加以下变量:

    • s3_bucket:您之前创建的 S3 存储桶的名称。

    • gcp_project:您的项目 ID。

    • gcs_bucket:您之前创建的存储桶的名称(不带 gs:// 前缀)。

    • gce_region:您希望 Dataproc 作业, Dataproc 无服务器网络要求。 这是您之前启用了专用 Google 访问通道的区域。

    • dataproc_service_account:Cloud Composer 环境的服务账号。您可以在 Cloud Composer 环境的“环境配置”标签页中找到此服务账号。

将 DAG 上传到您环境的存储桶

Cloud Composer 会安排 /dags 文件夹。要使用 Google Cloud 控制台:

  1. 在本地机器上,保存 s3togcsoperator_tutorial.py

  2. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  3. 在环境列表的 DAG 文件夹列中,点击 DAG 链接。系统会打开您的环境的 DAGs 文件夹。

  4. 点击上传文件

  5. 在本地机器上选择 s3togcsoperator_tutorial.py,然后点击 打开

触发 DAG

  1. 在您的 Cloud Composer 环境中,点击 DAG 标签页。

  2. 点击进入 DAG ID s3_to_gcs_dag

  3. 点击触发 DAG

  4. 等待大约 5 到 10 分钟,直到出现绿色对勾标记,表明 任务已成功完成。

验证 DAG 是否成功

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 浏览器面板中,点击您的项目名称。

  3. 点击 holidays_weather_joined

  4. 点击“预览”以查看生成的表格。请注意, 值列的单位为十分之一摄氏度。

  5. 点击 holidays_weather_normalized

  6. 点击“预览”可查看生成的表。请注意,值列中的数字以摄氏度为单位。

清理

删除您为本教程创建的各个资源:

后续步骤