Datenanalyse-DAG in Google Cloud mit Daten aus Azure ausführen

Cloud Composer 1 Cloud Composer 2

Diese Anleitung ist eine Änderung des Abschnitts Datenanalyse-DAG in Google Cloud ausführen. Sie zeigt, wie Sie Ihre Cloud Composer-Umgebung mit Microsoft Azure verbinden, um die dort gespeicherten Daten zu verwenden. Es wird gezeigt, wie Sie mit Cloud Composer einen Apache Airflow-DAG erstellen. Der DAG führt Daten aus einem öffentlichen BigQuery-Dataset mit einer CSV-Datei zusammen, die in einem Azure Blob Storage gespeichert sind, und führt dann einen Dataproc Serverless-Batchjob aus, um die verknüpften Daten zu verarbeiten.

Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, eine integrierte Datenbank mit globalen Klimazusammenfassungen. Die CSV-Datei enthält Informationen zu den Daten und Namen von Feiertagen in den USA von 1997 bis 2021.

Die Frage, die wir mit dem DAG beantworten möchten, lautet: „Wie warm war es in Chicago am Thanksgiving in den letzten 25 Jahren?“

Lernziele

  • Cloud Composer-Umgebung in der Standardkonfiguration erstellen
  • Blob in Azure erstellen
  • Leeres BigQuery-Dataset erstellen
  • Neuen Cloud Storage-Bucket erstellen
  • Erstellen Sie einen DAG, der die folgenden Aufgaben enthält, und führen Sie ihn aus:
    • Externes Dataset aus Azure Blob Storage in Cloud Storage laden
    • Externes Dataset aus Cloud Storage in BigQuery laden
    • Zwei Datasets in BigQuery verknüpfen
    • PySpark-Data-Analytics-Job ausführen

Hinweise

APIs aktivieren

Aktivieren Sie folgende APIs:

Console

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Berechtigungen erteilen

Weisen Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen zu:

Cloud Composer-Umgebung erstellen und vorbereiten

  1. Erstellen Sie eine Cloud Composer-Umgebung mit Standardparametern:

  2. Weisen Sie dem Dienstkonto, das in Ihrer Cloud Composer-Umgebung verwendet wird, die folgenden Rollen zu, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:

    • BigQuery-Nutzer (roles/bigquery.user)
    • BigQuery-Dateninhaber (roles/bigquery.dataOwner)
    • Dienstkontonutzer (roles/iam.serviceAccountUser)
    • Dataproc-Editor (roles/dataproc.editor)
    • Dataproc-Worker (roles/dataproc.worker)
  1. Installieren Sie das PyPI-Paket apache-airflow-providers-microsoft-azure in Ihrer Cloud Composer-Umgebung.

  2. Erstellen Sie ein leeres BigQuery-Dataset mit den folgenden Parametern:

    • Name: holiday_weather
    • Region: US
  3. Erstellen Sie einen neuen Cloud Storage-Bucket am multiregionalen Standort US.

  4. Führen Sie den folgenden Befehl aus, um den privaten Google-Zugriff im Standardsubnetz der Region zu aktivieren, in der Sie Dataproc Serverless ausführen möchten, um die Netzwerkanforderungen zu erfüllen. Wir empfehlen, dieselbe Region wie in Ihrer Cloud Composer-Umgebung zu verwenden.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    
  1. Erstellen Sie ein Speicherkonto mit den Standardeinstellungen.

  2. Rufen Sie den Zugriffsschlüssel und den Verbindungsstring für Ihr Speicherkonto ab.

  3. Erstellen Sie einen Container mit Standardoptionen in Ihrem neu erstellten Speicherkonto.

  4. Weisen Sie dem im vorherigen Schritt erstellten Container die Rolle Storage Blob Delegator zu.

  5. Laden Sie holidays.csv hoch, um ein Block-Blob mit Standardoptionen im Azure-Portal zu erstellen.

  6. Erstellen Sie ein SAS-Token für das Block-Blob, das Sie im vorherigen Schritt im Azure-Portal erstellt haben.

    • Signierungsmethode: Schlüssel für die Nutzerdelegierung
    • Berechtigungen: Lesen
    • Zulässige IP-Adresse: keine
    • Zulässige Protokolle: Nur HTTPS

Verbindung zu Azure von Cloud Composer herstellen

Fügen Sie über die Airflow-UI die Microsoft Azure-Verbindung hinzu:

  1. Klicken Sie auf Admin > Verbindungen.

  2. Erstellen Sie eine neue Verbindung mit der folgenden Konfiguration:

    • Verbindungs-ID: azure_blob_connection
    • Verbindungstyp: Azure Blob Storage
    • Blob Storage Log-in:Name Ihres Speicherkontos
    • Blob-Speicherschlüssel:der Zugriffsschlüssel für Ihr Speicherkonto
    • Blob Storage Account Connection String (Verbindungsstring für das Blob-Speicherkonto): Ihr Verbindungsstring für das Speicherkonto.
    • SAS-Token:das von Ihrem Blob generierte SAS-Token

Datenverarbeitung mit Dataproc Serverless

PySpark-Beispiel-Job ansehen

Der folgende Code ist ein PySpark-Beispieljob, der die Temperatur von Zehntelgrad Celsius in Grad Celsius umwandelt. Dieser Job wandelt Temperaturdaten aus dem Dataset in ein anderes Format um.

import sys

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

PySpark-Datei in Cloud Storage hochladen

So laden Sie die PySpark-Datei in Cloud Storage hoch:

  1. Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.

  2. Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:

    Zum Cloud Storage-Browser

  3. Klicken Sie auf den Namen des Buckets, den Sie zuvor erstellt haben.

  4. Klicken Sie im Tab Objekte für den Bucket auf die Schaltfläche Dateien hochladen, wählen Sie im angezeigten Dialogfeld data_analytics_process.py aus und klicken Sie auf Öffnen.

Datenanalyse-DAG

Beispiel-DAG ansehen

Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:

  • AzureBlobStorageToGCSOperator überträgt die Datei holidays.csv aus dem Azure-Block-Blob in Ihren Cloud Storage-Bucket.

  • Mit GCSToBigQueryOperator wird die Datei holidays.csv aus Cloud Storage in eine neue Tabelle im BigQuery-Dataset holidays_weather aufgenommen, das Sie zuvor erstellt haben.

  • Der DataprocCreateBatchOperator erstellt und führt einen PySpark-Batchjob mit Dataproc Serverless aus.

  • Mit BigQueryInsertJobOperator werden die Daten aus holidays.csv in der Spalte „Date“ mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d zusammengeführt. Die BigQueryInsertJobOperator-Aufgaben werden mit einer For-Schleife dynamisch generiert. Zur besseren Lesbarkeit in der Grafikansicht der Airflow-UI befinden sich diese Aufgaben in einer TaskGroup.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs import (
    AzureBlobStorageToGCSOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# Azure configs
AZURE_BLOB_NAME = "{{var.value.azure_blob_name}}"
AZURE_CONTAINER_NAME = "{{var.value.azure_container_name}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "azure_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    azure_blob_to_gcs = AzureBlobStorageToGCSOperator(
        task_id="azure_blob_to_gcs",
        # Azure args
        blob_name=AZURE_BLOB_NAME,
        container_name=AZURE_CONTAINER_NAME,
        wasb_conn_id="azure_blob_connection",
        filename=f"https://console.cloud.google.com/storage/browser/{BUCKET_NAME}/",
        # GCP args
        gcp_conn_id="google_cloud_default",
        object_name="holidays.csv",
        bucket_name=BUCKET_NAME,
        gzip=False,
        impersonation_chain=None,
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        azure_blob_to_gcs >> load_external_dataset >> bq_join_group >> create_batch

Variablen über die Airflow-UI hinzufügen

In Airflow sind Variablen eine universelle Möglichkeit, beliebige Einstellungen oder Konfigurationen als einfachen Speicher für Schlüssel/Wert-Paare zu speichern und abzurufen. Dieser DAG verwendet Airflow-Variablen zum Speichern gängiger Werte. So fügen Sie sie zu Ihrer Umgebung hinzu:

  1. Rufen Sie über die Cloud Composer-Konsole auf die Airflow-UI auf.

  2. Klicken Sie auf Verwaltung > Variablen.

  3. Fügen Sie die folgenden Variablen hinzu:

    • gcp_project: Ihre Projekt-ID.

    • gcs_bucket: der Name des zuvor erstellten Buckets (ohne das Präfix gs://).

    • gce_region: Die Region, in der Ihr Dataproc-Job ausgeführt werden soll, der die Anforderungen an das serverlose Dataproc-Netzwerk erfüllt. Das ist die Region, in der Sie zuvor den privaten Google-Zugriff aktiviert haben.

    • dataproc_service_account: das Dienstkonto für Ihre Cloud Composer-Umgebung. Sie finden dieses Dienstkonto auf dem Tab für die Umgebungskonfiguration für Ihre Cloud Composer-Umgebung.

    • azure_blob_name: der Name des Blobs, das Sie zuvor erstellt haben.

    • azure_container_name: der Name des Containers, den Sie zuvor erstellt haben.

DAG in den Bucket Ihrer Umgebung hochladen

Cloud Composer plant DAGs, die sich im Ordner /dags im Bucket Ihrer Umgebung befinden. So laden Sie den DAG über die Google Cloud Console hoch:

  1. Speichern Sie auf Ihrem lokalen Computer azureblobstoretogcsoperator_tutorial.py.

  2. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  3. Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.

  4. Klicken Sie auf Dateien hochladen.

  5. Wählen Sie azureblobstoretogcsoperator_tutorial.py auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.

DAG auslösen

  1. Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.

  2. Klicken Sie auf die DAG-ID azure_blob_to_gcs_dag.

  3. Klicken Sie auf Trigger DAG (DAG auslösen).

  4. Warten Sie etwa fünf bis zehn Minuten, bis Sie ein grünes Häkchen sehen, das anzeigt, dass die Aufgaben erfolgreich abgeschlossen wurden.

Erfolg des DAG validieren

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.

  3. Klicken Sie auf holidays_weather_joined.

  4. Klicken Sie auf „Vorschau“, um die resultierende Tabelle anzusehen. Beachten Sie, dass die Zahlen in der Wertspalte in Zehntelgrad Celsius angegeben sind.

  5. Klicken Sie auf holidays_weather_normalized.

  6. Klicken Sie auf „Vorschau“, um die resultierende Tabelle anzusehen. Beachten Sie, dass die Zahlen in der Spalte Werte in Grad Celsius angegeben sind.

Bereinigen

Löschen Sie einzelne Ressourcen, die Sie für diese Anleitung erstellt haben:

Nächste Schritte