Ejecutar un DAG de análisis de datos en Google Cloud con datos de AWS

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Este tutorial es una modificación de Ejecutar un DAG de analíticas de datos en Google Cloud, que muestra cómo conectar su entorno de Cloud Composer a Amazon Web Services para utilizar los datos almacenados allí. En él se muestra cómo usar Cloud Composer para crear un DAG de Apache Airflow. El DAG combina datos de un conjunto de datos público de BigQuery y de un archivo CSV almacenado en un segmento de Amazon Web Services (AWS) S3 y, a continuación, ejecuta una tarea por lotes de Dataproc Serverless para procesar los datos combinados.

El conjunto de datos público de BigQuery de este tutorial es ghcn_d, una base de datos integrada de resúmenes climáticos de todo el mundo. El archivo CSV contiene información sobre las fechas y los nombres de las festividades de EE. UU. entre 1997 y 2021.

La pregunta a la que queremos responder con el DAG es: "¿Qué temperatura hizo en Chicago el día de Acción de Gracias durante los últimos 25 años?".

Objetivos

  • Crear un entorno de Cloud Composer con la configuración predeterminada
  • Crear un segmento en AWS S3
  • Crear un conjunto de datos de BigQuery vacío
  • Crear un segmento de Cloud Storage
  • Crea y ejecuta un DAG que incluya las siguientes tareas:
    • Cargar un conjunto de datos externo de S3 a Cloud Storage
    • Cargar un conjunto de datos externo de Cloud Storage en BigQuery
    • Combinar dos conjuntos de datos en BigQuery
    • Ejecutar una tarea de PySpark de analíticas de datos

Antes de empezar

Gestionar permisos en AWS

  1. Crea una cuenta de AWS.

  2. Sigue la sección "Crear políticas con el editor visual" del tutorial de AWS sobre la creación de políticas de gestión de identidades y accesos para crear una política de gestión de identidades y accesos personalizada para AWS S3 con la siguiente configuración:

    • Servicio: S3
    • ListAllMyBuckets (s3:ListAllMyBuckets) para ver tu segmento de S3
    • CreateBucket (s3:CreateBucket) para crear un segmento
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls) para crear un segmento
    • ListBucket (s3:ListBucket) para conceder permiso para enumerar objetos en un segmento de S3
    • PutObject (s3:PutObject) para subir archivos a un contenedor
    • GetBucketVersioning (s3:GetBucketVersioning), para eliminar un objeto de un segmento
    • DeleteObject (s3:DeleteObject): para eliminar un objeto de un contenedor
    • ListBucketVersions (s3:ListBucketVersions) para eliminar un cubo
    • DeleteBucket (s3:DeleteBucket) para eliminar un segmento
    • Recursos: elija "Cualquiera" junto a "Contenedor" y "Objeto" para conceder permisos a cualquier recurso de ese tipo.
    • Etiqueta: ninguna
    • Nombre: TutorialPolicy

    Consulta la lista de acciones admitidas en Amazon S3 para obtener más información sobre cada configuración que se indica más arriba.

  3. Añade la política de gestión de identidades y accesos TutorialPolicy a tu identidad

Habilitar APIs

Habilita las siguientes APIs:

Consola

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Conceder permisos

Concede los siguientes roles y permisos a tu cuenta de usuario:

Crear y preparar un entorno de Cloud Composer

  1. Crea un entorno de Cloud Composer con los parámetros predeterminados:

  2. Asigna los siguientes roles a la cuenta de servicio que se usa en tu entorno de Cloud Composer para que los workers de Airflow puedan ejecutar correctamente las tareas de los DAG:

    • Usuario de BigQuery (roles/bigquery.user)
    • Propietario de datos de BigQuery (roles/bigquery.dataOwner)
    • Usuario de cuenta de servicio (roles/iam.serviceAccountUser)
    • Editor de Dataproc (roles/dataproc.editor)
    • Trabajador de Dataproc (roles/dataproc.worker)
  1. Instala el apache-airflow-providers-amazon paquete de PyPI en tu entorno de Cloud Composer.

  2. Crea un conjunto de datos de BigQuery vacío con los siguientes parámetros:

    • Nombre: holiday_weather
    • Región: US
  3. Crea un segmento de Cloud Storage en la multirregión US.

  4. Ejecuta el siguiente comando para habilitar el acceso privado a Google en la subred predeterminada de la región en la que quieras ejecutar Dataproc sin servidor para cumplir los requisitos de red. Te recomendamos que uses la misma región que tu entorno de Cloud Composer.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Crea un segmento de S3 con la configuración predeterminada en la región que prefieras.

Conectarse a AWS desde Cloud Composer

  1. Obtener el ID de clave de acceso y la clave de acceso secreta de AWS
  2. Añade tu conexión de AWS S3 con la interfaz de usuario de Airflow:

    1. Vaya a Administrar > Conexiones.
    2. Crea una conexión con la siguiente configuración:

      • ID de conexión: aws_s3_connection
      • Tipo de conexión: Amazon S3
      • Extras: {"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Procesamiento de datos con Dataproc Serverless

Consultar el ejemplo de tarea de PySpark

El código que se muestra a continuación es un ejemplo de trabajo de PySpark que convierte la temperatura de décimas de grado Celsius a grados Celsius. Este trabajo convierte los datos de temperatura del conjunto de datos a otro formato.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Sube el archivo PySpark a Cloud Storage

Para subir el archivo PySpark a Cloud Storage, sigue estos pasos:

  1. Guarda data_analytics_process.py en tu máquina local.

  2. En la Google Cloud consola, ve a la página Navegador de Cloud Storage:

    Ir al navegador de Cloud Storage

  3. Haga clic en el nombre del segmento que ha creado anteriormente.

  4. En la pestaña Objetos del contenedor, haga clic en el botón Subir archivos, seleccione data_analytics_process.py en el cuadro de diálogo que aparece y haga clic en Abrir.

Subir el archivo CSV a AWS S3

Para subir el archivo holidays.csv, sigue estos pasos:

  1. Guarda holidays.csv en tu máquina local.
  2. Sigue la guía de AWS para subir el archivo al bucket.

DAG de analíticas de datos

Explorar el DAG de ejemplo

El DAG usa varios operadores para transformar y unificar los datos:

  • El comando S3ToGCSOperator transfiere el archivo holidays.csv de tu segmento de AWS S3 a tu segmento de Cloud Storage.

  • El GCSToBigQueryOperator ingiere el archivo holidays.csv de Cloud Storage en una tabla nueva del conjunto de datos holidays_weather de BigQuery que has creado anteriormente.

  • El comando DataprocCreateBatchOperator crea y ejecuta una tarea por lotes de PySpark con Dataproc sin servidor.

  • La BigQueryInsertJobOperator combina los datos de holidays.csv de la columna "Date" con los datos meteorológicos del conjunto de datos público de BigQuery ghcn_d. Las tareas BigQueryInsertJobOperator se generan de forma dinámica mediante un bucle for y se encuentran en un TaskGroup para que sean más fáciles de leer en la vista de gráfico de la interfaz de usuario de Airflow.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

Usar la interfaz de usuario de Airflow para añadir variables

En Airflow, las variables son una forma universal de almacenar y recuperar ajustes o configuraciones arbitrarios como un simple almacén de valores clave. Este DAG usa variables de Airflow para almacenar valores comunes. Para añadirlos a tu entorno, sigue estos pasos:

  1. Acceder a la interfaz de usuario de Airflow desde la consola de Cloud Composer

  2. Vaya a Administrar > Variables.

  3. Añade las siguientes variables:

    • s3_bucket: el nombre del segmento de S3 que has creado anteriormente.

    • gcp_project: tu ID de proyecto.

    • gcs_bucket: el nombre del segmento que has creado anteriormente (sin el prefijo gs://).

    • gce_region: la región en la que quieres que se ejecute el trabajo de Dataproc que cumpla los requisitos de red de Dataproc Serverless. Es la región en la que has habilitado el acceso privado de Google anteriormente.

    • dataproc_service_account: la cuenta de servicio de tu entorno de Cloud Composer. Puedes encontrar esta cuenta de servicio en la pestaña de configuración del entorno de Cloud Composer.

Subir el DAG al segmento de tu entorno

Cloud Composer programa los DAGs que se encuentran en la carpeta /dags del segmento de tu entorno. Para subir el DAG con la consola, sigue estos pasos: Google Cloud

  1. En tu máquina local, guarda s3togcsoperator_tutorial.py.

  2. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  3. En la lista de entornos, en la columna Carpeta DAG, haga clic en el enlace DAGs. Se abrirá la carpeta DAGs de tu entorno.

  4. Haz clic en Subir archivos.

  5. Selecciona s3togcsoperator_tutorial.py en tu equipo local y haz clic en Abrir.

Activar el DAG

  1. En tu entorno de Cloud Composer, haz clic en la pestaña DAGs (DAGs).

  2. Haz clic en el ID de DAG s3_to_gcs_dag.

  3. Haz clic en Activar DAG.

  4. Espera entre cinco y diez minutos hasta que veas una marca de verificación verde que indica que las tareas se han completado correctamente.

Validar que el DAG se ha completado correctamente

  1. En la consola, ve a la página BigQuery. Google Cloud

    Ir a BigQuery

  2. En el panel Explorador, haz clic en el nombre de tu proyecto.

  3. Haz clic en holidays_weather_joined.

  4. Haz clic en Vista previa para ver la tabla resultante. Ten en cuenta que los números de la columna de valores están en décimas de grado Celsius.

  5. Haz clic en holidays_weather_normalized.

  6. Haz clic en Vista previa para ver la tabla resultante. Ten en cuenta que los números de la columna "Valor" están en grados Celsius.

Limpieza

Elimina los recursos que hayas creado para este tutorial:

Siguientes pasos