Datenanalyse-DAG in Google Cloud ausführen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

In dieser Anleitung wird gezeigt, wie Sie mit Cloud Composer ein Apache Airflow-DAG Die DAG verknüpft Daten aus einem öffentlichen BigQuery-Dataset mit einer gespeicherten CSV-Datei in einem Cloud Storage-Bucket Serverloser Dataproc-Batchjob zum Verarbeiten der zusammengeführten Daten.

Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, einer integrierten Datenbank mit Klimazusammenfassungen auf der ganzen Welt. Die CSV-Datei enthält Informationen zu den Daten und Namen von Feiertagen in den USA von 1997 bis 2021.

Die Frage, die wir mithilfe des DAG beantworten möchten, lautet: „Wie warm war es in Chicago? in den letzten 25 Jahren an Thanksgiving?“

Lernziele

  • Cloud Composer-Umgebung in der Standardkonfiguration erstellen
  • Leeres BigQuery-Dataset erstellen
  • Neuen Cloud Storage-Bucket erstellen
  • Erstellen Sie einen DAG und führen Sie ihn aus, der die folgenden Aufgaben enthält: <ph type="x-smartling-placeholder">
      </ph>
    • Externes Dataset aus Cloud Storage laden in BigQuery
    • Zwei Datasets in BigQuery verknüpfen
    • PySpark-Job zur Datenanalyse ausführen

Hinweise

APIs aktivieren

Aktivieren Sie folgende APIs:

Console

Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs aktivieren.

Aktivieren Sie die APIs

gcloud

Aktivieren Sie die Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Berechtigungen erteilen

Gewähren Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen:

Cloud Composer-Umgebung erstellen und vorbereiten

  1. Erstellen Sie eine Cloud Composer-Umgebung mit Standardeinstellung. Parameter:

  2. Weisen Sie dem Dienstkonto, das in Ihrem Cloud Composer-Umgebung, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausgeführt:

    • BigQuery-Nutzer (roles/bigquery.user)
    • BigQuery-Dateninhaber (roles/bigquery.dataOwner)
    • Dienstkontonutzer (roles/iam.serviceAccountUser)
    • Dataproc-Bearbeiter (roles/dataproc.editor)
    • Dataproc-Worker (roles/dataproc.worker)
  1. Leeres BigQuery-Dataset erstellen mit den folgenden Parametern:

    • Name: holiday_weather
    • Region: US
  2. Neuen Cloud Storage-Bucket erstellen in der Multiregion US.

  3. Führen Sie den folgenden Befehl aus, Privaten Google-Zugriff aktivieren im Standardsubnetz in der Region, in der Sie ausführen möchten Serverlose Dataproc-Funktionen für die Ausführung Netzwerkanforderungen. Mi. empfehlen, dieselbe Region wie in Cloud Composer zu verwenden zu verbessern.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Datenverarbeitung mit Dataproc Serverless

PySpark-Beispieljob ansehen

Der folgende Code ist ein PySpark-Beispieljob, der die Temperatur von Zehntelgrad in Celsius in Grad Celsius. Dieser Job erzielt eine Conversion Temperaturdaten aus dem Dataset in ein anderes Format konvertieren.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Unterstützende Dateien in Cloud Storage hochladen

So laden Sie die PySpark-Datei und das in holidays.csv gespeicherte Dataset hoch:

  1. Speichern data_analytics_process.py auf Ihren lokalen Computer übertragen.

  2. Speichern Sie die Datei holidays.csv auf Ihrem lokalen Computer.

  3. Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:

    Zum Cloud Storage Browser

  4. Klicken Sie auf den Namen des Buckets, den Sie zuvor erstellt haben.

  5. Klicken Sie auf dem Tab Objekte des Buckets auf die Schaltfläche Dateien hochladen. wählen Sie data_analytics_process.py und holidays.csv aus dem Dialogfeld aus, angezeigt wird, und klicken Sie auf Öffnen.

Datenanalyse-DAG

Beispiel-DAG ansehen

Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:

  • Die GCSToBigQueryOperator die Datei holidays.csv aus Cloud Storage in eine neue Tabelle in BigQuery holidays_weather-Dataset, das Sie zuvor erstellt haben.

  • Die DataprocCreateBatchOperator einen PySpark-Batch-Job mithilfe von Dataproc Serverless

  • Die BigQueryInsertJobOperator werden die Daten aus holidays.csv „Datum“ Spalte mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d. Die BigQueryInsertJobOperator-Aufgaben sind dynamisch mit einer For-Schleife generiert, und diese Aufgaben befinden sich TaskGroup für eine bessere Lesbarkeit in der Grafikansicht der Airflow-UI.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "runtime_config": {"version": "1.1"},
    "pyspark_batch": {
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "data_analytics_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region="{{ var.value.gce_region }}",
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )
    # This data is static and it is safe to use WRITE_TRUNCATE
    # to reduce chance of 409 duplicate errors
    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            # BigQuery configs
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # for demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        load_external_dataset >> bq_join_group >> create_batch

Variablen über die Airflow-UI hinzufügen

In Airflow Variablen sind eine universelle Möglichkeit, beliebige Einstellungen als einfachen Schlüssel/Wert-Speicher. Dieser DAG verwendet Airflow-Variablen, gemeinsame Werte speichern. So fügen Sie sie Ihrer Umgebung hinzu:

  1. Greifen Sie über die Cloud Composer-Konsole auf die Airflow-UI zu.

  2. Klicken Sie auf Admin &gt; Variablen.

  3. Fügen Sie die folgenden Variablen hinzu:

    • gcp_project: Ihre Projekt-ID.

    • gcs_bucket: der Name des Buckets, den Sie zuvor erstellt haben (ohne das Präfix gs://).

    • gce_region ist die Region, in der die Dataproc-Job, der die Dataproc Serverless-Netzwerkanforderungen. Das ist die Region, in der Sie den privaten Google-Zugriff zuvor aktiviert haben.

    • dataproc_service_account: das Dienstkonto für Ihr Cloud Composer-Umgebung. Sie finden diesen Dienst auf dem Tab für die Umgebungskonfiguration Cloud Composer-Umgebung.

DAG in den Bucket Ihrer Umgebung hochladen

Cloud Composer plant DAGs, die sich in der /dags-Ordner im Bucket Ihrer Umgebung. So laden Sie den DAG mithilfe der Google Cloud Console:

  1. Speichern Sie auf Ihrem lokalen Computer data_analytics_dag.py

  2. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  3. Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.

  4. Klicken Sie auf Dateien hochladen.

  5. Wählen Sie data_analytics_dag.py auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.

DAG auslösen

  1. Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.

  2. Klicken Sie auf die DAG-ID data_analytics_dag.

  3. Klicken Sie auf DAG auslösen.

  4. Warten Sie etwa fünf bis zehn Minuten, bis Sie ein grünes Häkchen sehen. Aufgaben erfolgreich abgeschlossen wurden.

Erfolg des DAG validieren

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.

  3. Klicken Sie auf holidays_weather_joined.

  4. Klicken Sie auf „Vorschau“, um die resultierende Tabelle zu sehen. Beachten Sie, dass die Zahlen in den Wertspalte in Zehntelgrad Celsius angegeben.

  5. Klicken Sie auf holidays_weather_normalized.

  6. Klicken Sie auf „Vorschau“, um die resultierende Tabelle zu sehen. Beachten Sie, dass die Zahlen in den Wertspalte sind in Grad Celsius angegeben.

Im Detail mit Dataproc Serverless (optional)

Sie können eine erweiterte Version dieses DAG mit komplexerem PySpark ausprobieren Datenverarbeitungsablauf. Weitere Informationen finden Sie unter Dataproc-Erweiterung für das Datenanalyse-Beispiel auf GitHub.

Bereinigen

Löschen Sie einzelne Ressourcen, die Sie für diese Anleitung erstellt haben:

Nächste Schritte