Ejecuta un DAG de análisis de datos en Google Cloud con datos de Azure

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Este instructivo es una modificación de Ejecuta un DAG de análisis de datos en Google Cloud que muestra cómo conectar tu entorno de Cloud Composer a Microsoft Azure para usar los datos almacenados allí. Se muestra cómo usar Cloud Composer para crear un DAG de Apache Airflow. El DAG une los datos de un conjunto de datos públicos de BigQuery y un archivo CSV almacenado en Azure Blob Storage y, luego, ejecuta un trabajo por lotes de Dataproc Serverless para procesar los datos unidos.

El conjunto de datos públicos de BigQuery en este instructivo es ghcn_d, una base de datos integrada de resúmenes climáticos de todo el mundo. El archivo CSV contiene información sobre las fechas y los nombres de los días festivos de EE.UU. desde 1997 hasta 2021.

La pregunta que queremos responder es: “¿Qué tan cálido fue en Chicago durante el Día de Acción de Gracias durante los últimos 25 años?”.

Objetivos

  • Crea un entorno de Cloud Composer en la configuración predeterminada
  • Crea un BLOB en Azure
  • Crea un conjunto de datos vacío de BigQuery
  • Cree un nuevo bucket de Cloud Storage
  • Crea y ejecuta un DAG que incluya las siguientes tareas:
    • Cargar un conjunto de datos externo de Azure Blob Storage a Cloud Storage
    • Cargar un conjunto de datos externo de Cloud Storage a BigQuery
    • Unir dos conjuntos de datos en BigQuery
    • Ejecuta un trabajo de PySpark de análisis de datos

Antes de comenzar

Habilita las APIs

Habilita las siguientes APIs:

Consola

Habilita las API de Dataproc, Cloud Composer, BigQuery, Cloud Storage.

Habilita las API

gcloud

Habilita las APIs de Dataproc, Cloud Composer, BigQuery, Cloud Storage:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Otorga permisos

Otorga los siguientes roles y permisos a tu cuenta de usuario:

Crea y prepara tu entorno de Cloud Composer

  1. Crea un entorno de Cloud Composer con parámetros predeterminados:

  2. Otorga las siguientes funciones a la cuenta de servicio que se usa en tu entorno de Cloud Composer para que los trabajadores de Airflow ejecuten tareas de DAG de forma correcta:

    • Usuario de BigQuery (roles/bigquery.user)
    • Propietario de datos de BigQuery (roles/bigquery.dataOwner)
    • Usuario de cuenta de servicio (roles/iam.serviceAccountUser)
    • Editor de Dataproc (roles/dataproc.editor)
    • Trabajador de Dataproc (roles/dataproc.worker)
  1. Instala el paquete de PyPI apache-airflow-providers-microsoft-azure en tu entorno de Cloud Composer.

  2. Crea un conjunto de datos vacío de BigQuery con los siguientes parámetros:

    • Nombre: holiday_weather
    • Región: US
  3. Crea un bucket de Cloud Storage nuevo en la multirregión US.

  4. Ejecuta el siguiente comando para habilitar el Acceso privado a Google en la subred predeterminada de la región en la que deseas ejecutar Dataproc sin servidores para cumplir con los requisitos de red. Te recomendamos usar la misma región que tu entorno de Cloud Composer.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    
  1. Crea una cuenta de almacenamiento con la configuración predeterminada.

  2. Obtén la clave de acceso y la string de conexión para tu cuenta de almacenamiento.

  3. Crea un contenedor con opciones predeterminadas en la cuenta de almacenamiento que acabas de crear.

  4. Otorga el rol Delegador de BLOB de almacenamiento para el contenedor creado en el paso anterior.

  5. Sube holidays.csv para crear un BLOB en bloque con las opciones predeterminadas en el portal de Azure.

  6. Crea un token SAS para el BLOB en bloque que creaste en el paso anterior en el portal de Azure.

    • Método de firma: Clave de delegación de usuarios
    • Permisos: lectura
    • Dirección IP permitida: Ninguna
    • Protocolos permitidos: Solo HTTPS

Conéctate a Azure desde Cloud Composer

Agrega tu conexión de Microsoft Azure con la IU de Airflow:

  1. Ve a Administrador > Conexiones.

  2. Crea una conexión nueva con la siguiente configuración:

    • ID de conexión: azure_blob_connection
    • Tipo de conexión: Azure Blob Storage
    • Acceso al almacenamiento de BLOB: el nombre de tu cuenta de almacenamiento
    • Clave de almacenamiento de BLOB: La clave de acceso para tu cuenta de almacenamiento
    • Cadena de conexión de la cuenta de almacenamiento de BLOB: la cadena de conexión de tu cuenta de almacenamiento
    • SAS Token: Es el token SAS que se generó desde el BLOB.

Procesamiento de datos con Dataproc Serverless

Explora el trabajo de PySpark de ejemplo

El código que se muestra a continuación es un ejemplo de trabajo de PySpark que convierte la temperatura de décimas de grado en grados Celsius a grados Celsius. Este trabajo convierte los datos de temperatura del conjunto de datos a un formato diferente.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Sube el archivo de PySpark a Cloud Storage

Para subir el archivo de PySpark a Cloud Storage, sigue estos pasos:

  1. Guarda data_analytics_process.py en tu máquina local.

  2. En la consola de Google Cloud, ve a la página del navegador de Cloud Storage:

    Ir al navegador de Cloud Storage

  3. Haz clic en el nombre del bucket que creaste anteriormente.

  4. En la pestaña Objetos del bucket, haz clic en el botón Subir archivos, selecciona data_analytics_process.py en el cuadro de diálogo que aparece y haz clic en Abrir.

DAG de análisis de datos

Explora el DAG de ejemplo

El DAG usa múltiples operadores para transformar y unificar los datos:

  • El objeto AzureBlobStorageToGCSOperator transfiere el archivo holidays.csv de tu BLOB en bloque de Azure a tu bucket de Cloud Storage.

  • El GCSToBigQueryOperator transfiere el archivo holidays.csv desde Cloud Storage a una tabla nueva en el conjunto de datos holidays_weather de BigQuery que creaste antes.

  • El DataprocCreateBatchOperator crea y ejecuta un trabajo por lotes de PySpark con Dataproc sin servidores.

  • El BigQueryInsertJobOperator une los datos de holidays.csv en la columna "Fecha" con los datos meteorológicos del conjunto de datos públicos de BigQuery ghcn_d. Las tareas BigQueryInsertJobOperator se generan de forma dinámica con un bucle for, y estas se encuentran en un TaskGroup para mejorar la legibilidad en la Vista de gráfico de la IU de Airflow.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import (
    AzureBlobStorageToGCSOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# Azure configs
AZURE_BLOB_NAME = "{{var.value.azure_blob_name}}"
AZURE_CONTAINER_NAME = "{{var.value.azure_container_name}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "azure_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    azure_blob_to_gcs = AzureBlobStorageToGCSOperator(
        task_id="azure_blob_to_gcs",
        # Azure args
        blob_name=AZURE_BLOB_NAME,
        container_name=AZURE_CONTAINER_NAME,
        wasb_conn_id="azure_blob_connection",
        filename=f"https://console.cloud.google.com/storage/browser/{BUCKET_NAME}/",
        # GCP args
        gcp_conn_id="google_cloud_default",
        object_name="holidays.csv",
        bucket_name=BUCKET_NAME,
        gzip=False,
        impersonation_chain=None,
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        azure_blob_to_gcs >> load_external_dataset >> bq_join_group >> create_batch

Usa la IU de Airflow para agregar variables

En Airflow, las variables son una manera universal de almacenar y recuperar parámetros de configuración arbitrarios como un almacén simple de par clave-valor. Este DAG usa variables de Airflow para almacenar valores comunes. Para agregarlos a tu entorno, sigue estos pasos:

  1. Accede a la IU de Airflow desde la consola de Cloud Composer.

  2. Ve a Administrador > Variables.

  3. Agrega las siguientes variables:

    • gcp_project: Es el ID de tu proyecto.

    • gcs_bucket: Es el nombre del bucket que creaste antes (sin el prefijo gs://).

    • gce_region: Es la región en la que deseas que tu trabajo de Dataproc cumpla con los requisitos de herramientas de redes sin servidores de Dataproc. Esta es la región donde habilitaste el Acceso privado a Google anteriormente.

    • dataproc_service_account: Es la cuenta de servicio para tu entorno de Cloud Composer. Puedes encontrar esta cuenta de servicio en la pestaña de configuración del entorno de Cloud Composer.

    • azure_blob_name: Es el nombre del BLOB que creaste antes.

    • azure_container_name: Es el nombre del contenedor que creaste antes.

Sube el DAG al bucket de tu entorno

Cloud Composer programa los DAG que se encuentran en la carpeta /dags del bucket de tu entorno. Para subir el DAG con la consola de Google Cloud, haz lo siguiente:

  1. En tu máquina local, guarda azureblobstoretogcsoperator_tutorial.py.

  2. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  3. En la lista de entornos, en la columna Carpeta de DAG, haz clic en el vínculo DAG. Se abrirá la carpeta DAG de tu entorno.

  4. Haz clic en Subir archivos.

  5. Selecciona azureblobstoretogcsoperator_tutorial.py en tu máquina local y haz clic en Abrir.

Activa el DAG

  1. En tu entorno de Cloud Composer, haz clic en la pestaña DAG.

  2. Haz clic en el ID del DAG azure_blob_to_gcs_dag.

  3. Haz clic en Activar DAG.

  4. Espera entre cinco y diez minutos hasta que veas una marca de verificación verde que indique que las tareas se completaron de forma correcta.

Valida el éxito del DAG

  1. En la consola de Google Cloud, ve a la página de BigQuery.

    Ir a BigQuery

  2. En el panel Explorador, haz clic en el nombre de tu proyecto.

  3. Haz clic en holidays_weather_joined.

  4. Haz clic en la vista previa para ver la tabla resultante. Ten en cuenta que los números en la columna de valores están en décimas de grado Celsius.

  5. Haz clic en holidays_weather_normalized.

  6. Haz clic en la vista previa para ver la tabla resultante. Ten en cuenta que los números en la columna de valores están en grados Celsius.

Limpieza

Borra los recursos individuales que creaste para este instructivo:

¿Qué sigue?