Esegui un DAG di analisi dei dati in Google Cloud utilizzando i dati di Azure

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questo tutorial è una modifica di Eseguire un DAG di analisi dei dati in Google Cloud che mostra come collegare l'ambiente Cloud Composer a Microsoft Azure per utilizzare i dati memorizzati al suo interno. Mostra come utilizzare a Cloud Composer per creare DAG Apache Airflow. La Il DAG unisce i dati di un set di dati pubblico BigQuery e di un file CSV archiviato in un Archiviazione BLOB di Azure ed esegue un job batch Dataproc serverless per elaborare e i dati di Google Cloud.

Il set di dati pubblico di BigQuery in questo tutorial è ghcn_d, un database integrato di riepiloghi climatici in tutto il mondo. Il file CSV contiene informazioni sulle date e i nomi delle festività statunitensi dal 1997 al 2021.

La domanda a cui vogliamo rispondere usando il DAG è: "Quanto era caldo a Chicago il Ringraziamento negli ultimi 25 anni?"

Obiettivi

  • Crea un ambiente Cloud Composer nella configurazione predefinita
  • Creare un blob in Azure
  • crea un set di dati BigQuery vuoto
  • Crea un nuovo bucket Cloud Storage
  • Crea ed esegui un DAG che includa le seguenti attività:
    • Carica un set di dati esterno da Azure Blob Storage in Cloud Storage
    • Carica un set di dati esterno da Cloud Storage in BigQuery
    • Unire due set di dati in BigQuery
    • Esegui un job PySpark di analisi dei dati

Prima di iniziare

Abilita API

Abilita le seguenti API:

Console

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Concedi le autorizzazioni

Concedi i ruoli e le autorizzazioni seguenti al tuo account utente:

Crea e prepara il tuo ambiente Cloud Composer

  1. Crea un ambiente Cloud Composer con impostazione predefinita parametri:

    di Gemini Advanced.
  2. Concedi i seguenti ruoli all'account di servizio utilizzato in dell'ambiente Cloud Composer affinché i worker di Airflow possano eseguire correttamente le attività DAG:

    • Utente BigQuery (roles/bigquery.user)
    • Proprietario dei dati BigQuery (roles/bigquery.dataOwner)
    • Utente account di servizio (roles/iam.serviceAccountUser)
    • Editor Dataproc (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. Installa il apache-airflow-providers-microsoft-azure pacchetto PyPI nel tuo ambiente Cloud Composer.

  2. Crea un set di dati BigQuery vuoto con i seguenti parametri:

    • Nome: holiday_weather
    • Regione: US
  3. Crea un nuovo bucket Cloud Storage nella regione multipla US.

  4. Esegui il seguente comando per abilitare l'accesso privato Google nella subnet predefinita della regione in cui vuoi eseguire Dataproc Serverless per soddisfare i requisiti di rete. Me di utilizzare la stessa regione di Cloud Composer completamente gestito di Google Cloud.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    
di Gemini Advanced.
  1. Crea un account di archiviazione con le impostazioni predefinite.

  2. Ottieni la chiave di accesso e la stringa di connessione per il tuo account di archiviazione.

  3. Creare un contenitore con le opzioni predefinite nell'account di archiviazione appena creato.

  4. Concede il ruolo Storage Blob Delegator per il contenitore creato nel passaggio precedente.

  5. Carica holidays.csv in crea un BLOB in blocco con opzioni predefinite nel portale Azure.

  6. Crea un token SAS per il blob di blocchi che hai creato nel passaggio precedente nel portale Azure.

    • Metodo di firma: chiave di delega utente
    • Autorizzazioni: lettura
    • Indirizzo IP consentito: nessuno
    • Protocolli consentiti: solo HTTPS

Connettiti ad Azure da Cloud Composer

Aggiungi il tuo Microsoft Azure connessione usando la UI di Airflow:

  1. Vai ad Amministrazione > Connessioni.

  2. Crea una nuova connessione con la seguente configurazione:

    • ID connessione: azure_blob_connection
    • Tipo di connessione: Azure Blob Storage
    • Accesso Archiviazione blob: il nome del tuo account di archiviazione
    • Chiave di archiviazione BLOB:la chiave di accesso per l'account di archiviazione.
    • Stringa di connessione dell'account di archiviazione blob: la stringa di connessione dell'account di archiviazione
    • Token SAS: il token SAS generato dal tuo blob

Elaborazione dei dati utilizzando Dataproc Serverless

Esplora il job PySpark di esempio

Il codice mostrato di seguito è un job PySpark di esempio che converte la temperatura da da decimi di grado in Celsius a gradi Celsius. Questo job converte di temperatura del set di dati in un formato diverso.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Carica il file PySpark in Cloud Storage

Per caricare il file PySpark su Cloud Storage:

  1. Salva data_analytics_process.py al tuo computer locale.

  2. Nella console Google Cloud, vai alla pagina Browser Cloud Storage:

    Vai al browser Cloud Storage

  3. Fai clic sul nome del bucket creato in precedenza.

  4. Nella scheda Oggetti del bucket, fai clic sul pulsante Carica file, selezionate data_analytics_process.py nella finestra di dialogo visualizzata e fai clic su Apri.

DAG di analisi dei dati

Esplora il DAG di esempio

Il DAG utilizza più operatori per trasformare e unificare i dati:

di Gemini Advanced.
import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import (
    AzureBlobStorageToGCSOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# Azure configs
AZURE_BLOB_NAME = "{{var.value.azure_blob_name}}"
AZURE_CONTAINER_NAME = "{{var.value.azure_container_name}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "azure_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    azure_blob_to_gcs = AzureBlobStorageToGCSOperator(
        task_id="azure_blob_to_gcs",
        # Azure args
        blob_name=AZURE_BLOB_NAME,
        container_name=AZURE_CONTAINER_NAME,
        wasb_conn_id="azure_blob_connection",
        filename=f"https://console.cloud.google.com/storage/browser/{BUCKET_NAME}/",
        # GCP args
        gcp_conn_id="google_cloud_default",
        object_name="holidays.csv",
        bucket_name=BUCKET_NAME,
        gzip=False,
        impersonation_chain=None,
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        azure_blob_to_gcs >> load_external_dataset >> bq_join_group >> create_batch

Utilizzare l'UI di Airflow per aggiungere variabili

In Airflow, variables rappresentano un metodo universale per memorizzare e recuperare impostazioni arbitrarie o un semplice archivio di coppie chiave-valore. Questo DAG utilizza le variabili Airflow per memorizzare i valori comuni. Per aggiungerle al tuo ambiente:

  1. Accedi all'interfaccia utente di Airflow dalla console Cloud Composer.

  2. Vai ad Amministrazione > Variabili.

  3. Aggiungi le seguenti variabili:

    • gcp_project: il tuo ID progetto.

    • gcs_bucket: il nome del bucket che hai creato in precedenza (senza il prefisso gs://).

    • gce_region: la regione in cui vuoi che il tuo job Dataproc soddisfi i requisiti di rete di Dataproc Serverless. Si tratta della regione in cui hai attivato l'accesso privato Google in precedenza.

    • dataproc_service_account: l'account di servizio per il tuo ambiente Cloud Composer. Puoi trovare questo servizio nella scheda di configurazione dell'ambiente nell'ambiente Cloud Composer.

    • azure_blob_name: il nome del blob creato in precedenza.

    • azure_container_name: il nome del contenitore creato in precedenza.

Carica il DAG nel bucket del tuo ambiente

Cloud Composer pianifica i DAG che si trovano /dags cartella nel bucket dell'ambiente. Per caricare il DAG utilizzando la console Google Cloud:

  1. Sulla tua macchina locale, salva azureblobstoretogcsoperator_tutorial.py.

  2. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  3. Nell'elenco degli ambienti, nella colonna Cartella DAG, fai clic sul link DAG. Viene aperta la cartella DAG del tuo ambiente.

  4. Fai clic su Carica file.

  5. Seleziona azureblobstoretogcsoperator_tutorial.py sulla tua macchina locale e fai clic su Apri.

Attiva il DAG

  1. Nell'ambiente Cloud Composer, fai clic sulla scheda DAG.

  2. Fai clic sull'ID DAG azure_blob_to_gcs_dag.

  3. Fai clic su Attiva DAG.

  4. Attendi circa cinque-dieci minuti finché non viene visualizzato un segno di spunta verde che indica che le attività sono state completate correttamente.

Convalida l'esito del DAG

  1. Nella console Google Cloud, vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nel riquadro Explorer, fai clic sul nome del tuo progetto.

  3. Fai clic su holidays_weather_joined.

  4. Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella sono in decimi di grado Celsius.

  5. Fai clic su holidays_weather_normalized.

  6. Fai clic su Anteprima per visualizzare la tabella risultante. Tieni presente che i numeri nella sono in gradi Celsius.

Esegui la pulizia

Elimina le singole risorse che hai creato per questo tutorial:

Passaggi successivi