Azure의 데이터를 사용하여 Google Cloud에서 데이터 분석 DAG 실행

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 튜토리얼은 Cloud Composer 환경을 Microsoft Azure에 연결하여 여기에 저장된 데이터를 활용하는 방법을 보여주는 Google Cloud에서 데이터 분석 DAG 실행의 수정판입니다. 여기에서는 Cloud Composer를 사용하여 Apache Airflow DAG를 만드는 방법을 보여줍니다. DAG는 BigQuery 공개 데이터 세트의 데이터와 Azure Blob Storage에 저장된 CSV 파일을 조인한 후 Dataproc 서버리스 일괄 작업을 실행하여 조인된 데이터를 처리합니다.

이 튜토리얼의 BigQuery 공개 데이터 세트는 전 세계 기후 요약 통합 데이터베이스인 ghcn_d입니다. CSV 파일에는 1997년부터 2021년까지 미국 공휴일 날짜와 이름에 대한 정보가 포함되어 있습니다.

DAG를 사용하여 답변하려는 질문은 '지난 25년 동안 추수감사절에 시카고는 얼마나 따뜻했나요?'입니다.

목표

  • 기본 구성으로 Cloud Composer 환경 만들기
  • Azure에서 Blob 만들기
  • 빈 BigQuery 데이터 세트 만들기
  • 새 Cloud Storage 버킷 만들기
  • 다음 태스크가 포함된 DAG 만들고 실행하기:
    • Azure Blob Storage에서 Cloud Storage로 외부 데이터 세트 로드
    • Cloud Storage에서 BigQuery로 외부 데이터 세트 로드
    • BigQuery에서 두 데이터 세트 결합
    • 데이터 분석 PySpark 작업 실행

시작하기 전에

API 사용 설정

다음 API를 사용 설정합니다.

콘솔

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

권한 부여

사용자 계정에 다음 역할과 권한을 부여합니다.

Cloud Composer 환경 만들기 및 준비

  1. 기본 매개변수로 Cloud Composer 환경을 만듭니다.

  2. Airflow 작업자가 DAG 태스크를 성공적으로 실행할 수 있도록 Cloud Composer 환경에서 사용되는 서비스 계정에 다음 역할을 부여합니다.

    • BigQuery 사용자(roles/bigquery.user)
    • BigQuery 데이터 소유자(roles/bigquery.dataOwner)
    • 서비스 계정 사용자(roles/iam.serviceAccountUser)
    • Dataproc 편집자(roles/dataproc.editor)
    • Dataproc 작업자(roles/dataproc.worker)
  1. Cloud Composer 환경에 apache-airflow-providers-microsoft-azure PyPI 패키지를 설치합니다.

  2. 다음 매개변수를 사용하여 빈 BigQuery 데이터 세트를 만듭니다.

    • 이름: holiday_weather
    • 리전: US
  3. US 멀티 리전에 새 Cloud Storage 버킷을 만듭니다.

  4. 다음 명령어를 실행하여 네트워킹 요구사항이 충족되도록 Dataproc 서버리스를 실행하려는 리전의 기본 서브넷에서 비공개 Google 액세스를 사용 설정합니다. Cloud Composer 환경과 동일한 리전을 사용하는 것이 좋습니다.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    
  1. 기본 설정으로 스토리지 계정을 만듭니다.

  2. 스토리지 계정의 액세스 키와 연결 문자열을 가져옵니다.

  3. 새로 만든 스토리지 계정에서 기본 옵션으로 컨테이너를 만듭니다.

  4. 이전 단계에서 만든 컨테이너에 대한 스토리지 Blob 위임자 역할을 부여합니다.

  5. holidays.csv를 업로드하여 Azure 포털에서 기본 옵션으로 블록 Blob를 만듭니다.

  6. Azure 포털에서 이전 단계에 만든 블록 blob에 대한 SAS 토큰을 만듭니다.

    • 서명 방법 - 사용자 위임 키
    • 권한 - 읽기
    • 허용된 IP 주소 - 없음
    • 허용되는 프로토콜 - HTTPS 전용

Cloud Composer에서 Azure에 연결

Airflow UI를 사용하여 Microsoft Azure 연결을 추가합니다.

  1. 관리자 > 연결로 이동합니다.

  2. 다음 구성으로 새 연결을 만듭니다.

    • 연결 ID: azure_blob_connection
    • 연결 유형: Azure Blob Storage
    • Blob 스토리지 로그인: 스토리지 계정 이름
    • Blob 스토리지 키: 스토리지 계정의 액세스 키
    • Blob 스토리지 계정 연결 문자열: 스토리지 계정 연결 문자열
    • SAS 토큰: Blob에서 생성된 SAS 토큰

Dataproc 서버리스를 사용한 데이터 처리

PySpark 작업 예시 살펴보기

다음 코드는 온도를 섭씨 1/10도에서 섭씨로 변환하는 PySpark 작업 예시입니다. 이 작업은 데이터 세트의 온도 데이터를 다른 형식으로 변환합니다.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Cloud Storage에 PySpark 파일 업로드

Cloud Storage에 PySpark 파일 업로드

  1. 로컬 머신에 data_analytics_process.py를 저장합니다.

  2. Google Cloud 콘솔에서 Cloud Storage 브라우저 페이지로 이동합니다.

    Cloud Storage 브라우저로 이동

  3. 앞에서 만든 버킷의 이름을 클릭합니다.

  4. 버킷의 객체 탭에서 파일 업로드 버튼을 클릭하고 나타나는 대화상자에서 data_analytics_process.py를 선택한 후 열기를 클릭합니다.

데이터 분석 DAG

예시 DAG 살펴보기

DAG는 여러 연산자를 사용하여 데이터를 변환하고 통합합니다.

  • AzureBlobStorageToGCSOperatorholidays.csv 파일을 Azure 블록 Blob에서 Cloud Storage 버킷으로 전송합니다.

  • GCSToBigQueryOperator는 Cloud Storage의 holidays.csv 파일을 앞에서 만든 BigQuery holidays_weather 데이터 세트의 새 테이블로 수집합니다.

  • DataprocCreateBatchOperator는 Dataproc 서버리스를 사용하여 PySpark 일괄 작업을 만들고 실행합니다.

  • BigQueryInsertJobOperator는 '날짜' 열에 있는 holidays.csv의 데이터를 BigQuery 공개 데이터 세트 ghcn_d의 날씨 데이터와 조인합니다. BigQueryInsertJobOperator 태스크는 for 루프를 사용하여 동적으로 생성되며 이러한 태스크는 TaskGroup에 있으므로 Airflow UI의 그래프 뷰에서 가독성이 향상됩니다.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import (
    AzureBlobStorageToGCSOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# Azure configs
AZURE_BLOB_NAME = "{{var.value.azure_blob_name}}"
AZURE_CONTAINER_NAME = "{{var.value.azure_container_name}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "azure_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    azure_blob_to_gcs = AzureBlobStorageToGCSOperator(
        task_id="azure_blob_to_gcs",
        # Azure args
        blob_name=AZURE_BLOB_NAME,
        container_name=AZURE_CONTAINER_NAME,
        wasb_conn_id="azure_blob_connection",
        filename=f"https://console.cloud.google.com/storage/browser/{BUCKET_NAME}/",
        # GCP args
        gcp_conn_id="google_cloud_default",
        object_name="holidays.csv",
        bucket_name=BUCKET_NAME,
        gzip=False,
        impersonation_chain=None,
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        azure_blob_to_gcs >> load_external_dataset >> bq_join_group >> create_batch

Airflow UI를 사용하여 변수 추가

Airflow에서 변수는 임의의 설정이나 구성을 간단한 키-값 스토어로 저장하고 검색하는 범용 방법입니다. 이 DAG는 Airflow 변수를 사용하여 공통 값을 저장합니다. 환경에 추가하려면 다음 안내를 따르세요.

  1. Cloud Composer 콘솔에서 Airflow UI에 액세스합니다.

  2. 관리자 > 변수로 이동합니다.

  3. 다음 변수를 추가합니다.

    • gcp_project: 프로젝트 ID입니다.

    • gcs_bucket: 앞에서 만든 버킷의 이름입니다(gs:// 프리픽스 제외).

    • gce_region: Dataproc 서버리스 네트워킹 요구사항을 충족하는 Dataproc 작업을 원하는 리전입니다. 이전에 비공개 Google 액세스를 사용 설정한 리전입니다.

    • dataproc_service_account: Cloud Composer 환경의 서비스 계정입니다. Cloud Composer 환경의 환경 구성 탭에서 이 서비스 계정을 찾을 수 있습니다.

    • azure_blob_name: 앞에서 만든 Blob의 이름입니다.

    • azure_container_name: 앞에서 만든 컨테이너의 이름입니다.

환경의 버킷에 DAG 업로드

Cloud Composer는 환경 버킷의 /dags 폴더에 있는 DAG를 예약합니다. Google Cloud 콘솔을 사용하여 DAG를 업로드하려면 다음 안내를 따르세요.

  1. 로컬 머신에서 azureblobstoretogcsoperator_tutorial.py를 저장합니다.

  2. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  3. 환경 목록의 DAG 폴더 열에서 DAG 링크를 클릭합니다. 환경의 DAG 폴더가 열립니다.

  4. 파일 업로드를 클릭합니다.

  5. 로컬 머신에서 azureblobstoretogcsoperator_tutorial.py를 선택하고 열기를 클릭합니다.

DAG 트리거

  1. Cloud Composer 환경에서 DAG 탭을 클릭합니다.

  2. DAG ID azure_blob_to_gcs_dag를 클릭합니다.

  3. DAG 트리거를 클릭합니다.

  4. 태스크가 성공적으로 완료되었음을 나타내는 녹색 체크 표시가 나타날 때까지 5~10분 정도 기다립니다.

DAG 성공 검증

  1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 탐색기 패널에서 프로젝트 이름을 클릭합니다.

  3. holidays_weather_joined를 클릭합니다.

  4. 결과 테이블을 보려면 미리보기를 클릭합니다. 값 열의 숫자는 섭씨 1/10도입니다.

  5. holidays_weather_normalized를 클릭합니다.

  6. 결과 테이블을 보려면 미리보기를 클릭합니다. 값 열의 숫자는 섭씨입니다.

삭제

이 튜토리얼에서 만든 개별 리소스를 삭제합니다.

다음 단계