Esegui un DAG di analisi dei dati in Google Cloud utilizzando i dati di AWS

Cloud Composer 1 | Cloud Composer 2

Questo tutorial è una modifica di Esegui un DAG di analisi dei dati in Google Cloud che mostra come connettere il tuo ambiente Cloud Composer ad Amazon Web Services per utilizzare i dati archiviati. Mostra come utilizzare Cloud Composer per creare un DAG Apache Airflow. Il DAG unisce i dati di un set di dati pubblico BigQuery e di un file CSV archiviato in un bucket S3 Amazon Web Services (AWS) e poi esegue un job batch Dataproc Serverless per elaborare i dati uniti.

Il set di dati pubblico BigQuery in questo tutorial è ghcn_d, un database integrato di riepiloghi climatici in tutto il mondo. Il file CSV contiene informazioni sulle date e i nomi delle festività degli Stati Uniti dal 1997 al 2021.

La domanda a cui vogliamo rispondere utilizzando il DAG è: "Che caldo ha fatto a Chicago il Giorno del Ringraziamento negli ultimi 25 anni?"

Obiettivi

  • Crea un ambiente Cloud Composer nella configurazione predefinita
  • Crea un bucket in AWS S3
  • crea un set di dati BigQuery vuoto
  • Crea un nuovo bucket Cloud Storage
  • Crea ed esegui un DAG che includa le seguenti attività:
    • Carica un set di dati esterno da S3 a Cloud Storage
    • Carica un set di dati esterno da Cloud Storage a BigQuery
    • Unire due set di dati in BigQuery
    • Esegui un job PySpark di analisi dei dati

Prima di iniziare

Gestisci le autorizzazioni in AWS

  1. Crea un account AWS.

  2. Segui la sezione "Creazione di criteri con l'editor visivo" del tutorial per AWS sulla creazione di criteri IAM per creare un criterio IAM personalizzato per AWS S3 con la seguente configurazione:

    • Servizio: S3
    • ListAllMyBuckets (s3:ListAllMyBuckets), per la visualizzazione del bucket S3
    • CreateBucket (s3:CreateBucket), per la creazione di un bucket
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls), per creare un bucket
    • ListBucket (s3:ListBucket), per concedere l'autorizzazione per elencare gli oggetti in un bucket S3
    • PutObject (s3:PutObject), per il caricamento di file in un bucket
    • GetBucketVersioning (s3:GetBucketVersioning), per eliminare un oggetto in un bucket
    • DeleteObject (s3:DeleteObject), per eliminare un oggetto in un bucket
    • ListBucketVersions (s3:ListBucketVersions), per eliminare un bucket
    • DeleteBucket (s3:DeleteBucket), per eliminare un bucket
    • Risorse: scegli "Qualsiasi" accanto a "bucket" e "oggetto" per concedere le autorizzazioni a tutte le risorse di quel tipo.
    • Tag: nessuno
    • Nome: TutorialPolicy

    Per ulteriori informazioni su ogni configurazione descritta in precedenza, consulta l'elenco delle azioni supportate in Amazon S3.

  3. Aggiungere il criterio IAM TutorialPolicy alla tua identità

Abilita le API

Abilita le seguenti API:

Console

Abilita le API Dataproc, Cloud Composer, BigQuery, Cloud Storage.

Abilita le API

gcloud

Abilita le API Dataproc, Cloud Composer, BigQuery, Cloud Storage.

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Concedi le autorizzazioni

Concedi i ruoli e le autorizzazioni seguenti al tuo account utente:

Crea e prepara il tuo ambiente Cloud Composer

  1. Crea un ambiente Cloud Composer con parametri predefiniti:

  2. Concedi i ruoli seguenti all'account di servizio utilizzato nel tuo ambiente Cloud Composer affinché i worker di Airflow possano eseguire correttamente le attività DAG:

    • Utente BigQuery (roles/bigquery.user)
    • Proprietario dati BigQuery (roles/bigquery.dataOwner)
    • Utente account di servizio (roles/iam.serviceAccountUser)
    • Editor Dataproc (roles/dataproc.editor)
    • Worker Dataproc (roles/dataproc.worker)
  1. Installa il pacchetto PyPI apache-airflow-providers-amazon nel tuo ambiente Cloud Composer.

  2. Crea un set di dati BigQuery vuoto con i seguenti parametri:

    • Nome: holiday_weather
    • Regione: US
  3. Crea un nuovo bucket Cloud Storage nella località multiregionale US.

  4. Esegui questo comando per abilitare l'accesso privato Google nella subnet predefinita nella regione in cui vuoi eseguire Dataproc Serverless per soddisfare i requisiti di rete. Ti consigliamo di utilizzare la stessa regione del tuo ambiente Cloud Composer.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Crea un bucket S3 con le impostazioni predefinite nella tua regione preferita.

Connettiti ad AWS da Cloud Composer

  1. Recuperare l'ID della chiave di accesso AWS e la chiave di accesso segreta
  2. Aggiungi la connessione AWS S3 utilizzando l'interfaccia utente di Airflow:

    1. Vai ad Amministrazione > Connessioni.
    2. Crea una nuova connessione con la seguente configurazione:

      • ID connessione: aws_s3_connection
      • Tipo di connessione: Amazon S3
      • Extra: {"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Elaborazione dati con Dataproc Serverless

Esplora il job PySpark di esempio

Il codice mostrato di seguito è un esempio di job PySpark che converte la temperatura da decimi di grado in Celsius a gradi Celsius. Questo job converte i dati sulla temperatura dal set di dati in un formato diverso.

import sys

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Carica il file PySpark su Cloud Storage

Per caricare il file PySpark in Cloud Storage:

  1. Salva data_analytics_process.py sulla tua macchina locale.

  2. Nella console Google Cloud, vai alla pagina Browser Cloud Storage:

    Vai al browser Cloud Storage

  3. Fai clic sul nome del bucket che hai creato in precedenza.

  4. Nella scheda Oggetti del bucket, fai clic sul pulsante Carica file, seleziona data_analytics_process.py nella finestra di dialogo visualizzata e fai clic su Apri.

Carica il file CSV su AWS S3

Per caricare il file holidays.csv:

  1. Risparmia holidays.csv sulla macchina locale.
  2. Segui la guida di AWS per caricare il file nel bucket.

DAG di analisi dei dati

Esplora il DAG di esempio

Il DAG utilizza più operatori per trasformare e unificare i dati:

  • S3ToGCSOperator trasferisce il file holidays.csv dal bucket AWS S3 al bucket Cloud Storage.

  • GCSToBigQueryOperator importa il file holidays.csv da Cloud Storage in una nuova tabella nel set di dati BigQueryholidays_weather che hai creato in precedenza.

  • DataprocCreateBatchOperator crea ed esegue un job batch PySpark utilizzando Dataproc Serverless.

  • BigQueryInsertJobOperator unisce i dati di holidays.csv nella colonna "Data" ai dati meteo del set di dati pubblico BigQuery ghcn_d. Le attività BigQueryInsertJobOperator vengono generate dinamicamente utilizzando un ciclo for e si trovano in un TaskGroup per una migliore leggibilità nella visualizzazione del grafico dell'interfaccia utente di Airflow.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

Utilizzare l'interfaccia utente di Airflow per aggiungere variabili

In Airflow, le variabili offrono un modo universale per archiviare e recuperare impostazioni o configurazioni arbitrarie come un semplice archivio chiave-valore. Questo DAG utilizza le variabili Airflow per archiviare i valori comuni. Per aggiungerli al tuo ambiente:

  1. Accedi all'UI di Airflow dalla console di Cloud Composer.

  2. Vai ad Amministrazione > Variabili.

  3. Aggiungi le seguenti variabili:

    • s3_bucket: il nome del bucket S3 che hai creato in precedenza.

    • gcp_project: il tuo ID progetto.

    • gcs_bucket: il nome del bucket che hai creato in precedenza (senza il prefisso gs://).

    • gce_region: la regione in cui vuoi che il job Dataproc soddisfi i requisiti di networking di Dataproc Serverless. Questa è la regione in cui hai attivato l'accesso privato Google in precedenza.

    • dataproc_service_account: l'account di servizio per il tuo ambiente Cloud Composer. Puoi trovare questo account di servizio nella scheda di configurazione dell'ambiente per il tuo ambiente Cloud Composer.

Carica il DAG nel bucket del tuo ambiente

Cloud Composer pianifica i DAG che si trovano nella cartella /dags nel bucket del tuo ambiente. Per caricare il DAG utilizzando la console Google Cloud:

  1. Salva s3togcsoperator_tutorial.py sulla macchina locale.

  2. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai a Ambienti

  3. Nell'elenco degli ambienti, nella colonna Cartella DAG, fai clic sul link DAG. Si apre la cartella DAG del tuo ambiente.

  4. Fai clic su Carica file.

  5. Seleziona s3togcsoperator_tutorial.py sulla macchina locale e fai clic su Apri.

Attiva il DAG

  1. Nel tuo ambiente Cloud Composer, fai clic sulla scheda DAG.

  2. Fai clic sull'ID DAG s3_to_gcs_dag.

  3. Fai clic su Attiva DAG.

  4. Attendi circa 5-10 minuti finché non viene visualizzato un segno di spunta verde che indica che le attività sono state completate correttamente.

Convalidare il successo del DAG

  1. Nella console Google Cloud, vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nel riquadro Explorer, fai clic sul nome del progetto.

  3. Fai clic su holidays_weather_joined.

  4. Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna del valore sono in decimi di grado Celsius.

  5. Fai clic su holidays_weather_normalized.

  6. Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella colonna del valore sono in gradi Celsius.

esegui la pulizia

Elimina le singole risorse che hai creato per questo tutorial:

Passaggi successivi