DAG für Datenanalysen in Google Cloud mit Daten aus AWS ausführen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Diese Anleitung ist eine Modifikation der Anleitung Data Analytics-DAG in Google Cloud ausführen, in der gezeigt wird, wie Sie Ihre Cloud Composer-Umgebung mit Amazon Web Services verbinden, um dort gespeicherte Daten zu verwenden. Darin wird gezeigt, wie Sie mit Cloud Composer einen Apache Airflow-DAG erstellen. Der DAG führt Daten aus einem öffentlichen BigQuery-Dataset und einer CSV-Datei zusammen, die in einem Amazon Web Services (AWS) S3-Bucket gespeichert ist. Anschließend wird ein Dataproc Serverless-Batchjob ausgeführt, um die zusammengeführten Daten zu verarbeiten.

Das öffentliche BigQuery-Dataset in dieser Anleitung ist ghcn_d, eine integrierte Datenbank mit Klimazusammenfassungen auf der ganzen Welt. Die CSV-Datei enthält Informationen zu den Daten und Namen von US-Feiertagen von 1997 bis 2021.

Mit dem DAG möchten wir die Frage beantworten: „Wie warm war es in Chicago an Thanksgiving in den letzten 25 Jahren?“

Lernziele

  • Cloud Composer-Umgebung in der Standardkonfiguration erstellen
  • Bucket in AWS S3 erstellen
  • Leeres BigQuery-Dataset erstellen
  • Neuen Cloud Storage-Bucket erstellen
  • Erstellen und führen Sie einen DAG mit den folgenden Aufgaben aus:
    • Externen Datensatz aus S3 in Cloud Storage laden
    • Externes Dataset aus Cloud Storage in BigQuery laden
    • Zwei Datasets in BigQuery zusammenführen
    • PySpark-Job für Datenanalyse ausführen

Hinweis

Berechtigungen in AWS verwalten

  1. Erstellen Sie ein AWS-Konto.

  2. Folgen Sie dem Abschnitt „Richtlinien mit dem visuellen Editor erstellen“ in der Anleitung zum Erstellen von IAM-Richtlinien in AWS, um eine benutzerdefinierte IAM-Richtlinie für AWS S3 mit der folgenden Konfiguration zu erstellen:

    • Dienst:S3
    • ListAllMyBuckets (s3:ListAllMyBuckets): Zeigt Ihren S3-Bucket an.
    • CreateBucket (s3:CreateBucket): zum Erstellen eines Buckets
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls) zum Erstellen eines Buckets
    • ListBucket (s3:ListBucket): Gewährt die Berechtigung zum Auflisten von Objekten in einem S3-Bucket.
    • PutObject (s3:PutObject) zum Hochladen von Dateien in einen Bucket
    • GetBucketVersioning (s3:GetBucketVersioning) zum Löschen eines Objekts in einem Bucket
    • DeleteObject (s3:DeleteObject): zum Löschen eines Objekts in einem Bucket
    • ListBucketVersions (s3:ListBucketVersions) zum Löschen eines Buckets
    • DeleteBucket (s3:DeleteBucket): zum Löschen eines Buckets
    • Ressourcen:Wählen Sie neben „Bucket“ und „Objekt“ die Option „Beliebig“ aus, um allen Ressourcen dieses Typs Berechtigungen zu gewähren.
    • Tag:Keines
    • Name:TutorialPolicy

    Weitere Informationen zu den einzelnen Konfigurationen oben finden Sie in der Liste der in Amazon S3 unterstützten Aktionen.

  3. Fügen Sie Ihrer Identität die IAM-Richtlinie TutorialPolicy hinzu.

APIs aktivieren

Aktivieren Sie folgende APIs:

Console

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Berechtigungen erteilen

Weisen Sie Ihrem Nutzerkonto die folgenden Rollen und Berechtigungen zu:

Cloud Composer-Umgebung erstellen und vorbereiten

  1. Erstellen Sie eine Cloud Composer-Umgebung mit den Standardparametern:

  2. Weisen Sie dem Dienstkonto, das in Ihrer Cloud Composer-Umgebung verwendet wird, die folgenden Rollen zu, damit die Airflow-Worker DAG-Aufgaben erfolgreich ausführen können:

    • BigQuery-Nutzer (roles/bigquery.user)
    • BigQuery-Dateninhaber (roles/bigquery.dataOwner)
    • Dienstkontonutzer (roles/iam.serviceAccountUser)
    • Dataproc-Bearbeiter (roles/dataproc.editor)
    • Dataproc-Worker (roles/dataproc.worker)
  1. Installieren Sie das apache-airflow-providers-amazon PyPI-Paket in Ihrer Cloud Composer-Umgebung.

  2. Erstellen Sie ein leeres BigQuery-Dataset mit den folgenden Parametern:

    • Name: holiday_weather
    • Region: US
  3. Erstellen Sie einen neuen Cloud Storage-Bucket in der Multi-Region US.

  4. Führen Sie den folgenden Befehl aus, um privaten Google-Zugriff im Standardsubnetz in der Region zu aktivieren, in der Sie Dataproc Serverless ausführen möchten, um die Netzwerkanforderungen zu erfüllen. Wir empfehlen, dieselbe Region wie für Ihre Cloud Composer-Umgebung zu verwenden.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Erstellen Sie einen S3-Bucket mit den Standardeinstellungen in Ihrer bevorzugten Region.

Verbindung von Cloud Composer zu AWS herstellen

  1. AWS-Zugriffsschlüssel-ID und Secret-Zugriffsschlüssel abrufen
  2. Fügen Sie die AWS S3-Verbindung über die Airflow-Benutzeroberfläche hinzu:

    1. Klicken Sie auf Verwaltung > Verbindungen.
    2. Erstellen Sie eine neue Verbindung mit der folgenden Konfiguration:

      • Verbindungs-ID:aws_s3_connection
      • Verbindungstyp:Amazon S3
      • Extras:{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Datenverarbeitung mit Dataproc Serverless

Beispiel für einen PySpark-Job ansehen

Der folgende Code ist ein Beispiel für einen PySpark-Job, mit dem Temperaturen von Zehntelgraden Celsius in Grad Celsius umgewandelt werden. Mit diesem Job werden Temperaturdaten aus dem Datensatz in ein anderes Format konvertiert.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

PySpark-Datei in Cloud Storage hochladen

So laden Sie die PySpark-Datei in Cloud Storage hoch:

  1. Speichern Sie data_analytics_process.py auf Ihrem lokalen Computer.

  2. Rufen Sie in der Google Cloud Console die Seite Cloud Storage-Browser auf:

    Zum Cloud Storage Browser

  3. Klicken Sie auf den Namen des zuvor erstellten Buckets.

  4. Klicken Sie auf dem Tab Objekte für den Bucket auf die Schaltfläche Dateien hochladen, wählen Sie im angezeigten Dialogfeld data_analytics_process.py aus und klicken Sie auf Öffnen.

CSV-Datei in AWS S3 hochladen

So laden Sie die holidays.csv-Datei hoch:

  1. Speichern Sie holidays.csv auf Ihrem lokalen Computer.
  2. Folgen Sie der AWS-Anleitung, um die Datei in Ihren Bucket hochzuladen.

DAG für Datenanalyse

Beispiel-DAG ansehen

Der DAG verwendet mehrere Operatoren, um die Daten zu transformieren und zu vereinheitlichen:

  • Mit S3ToGCSOperator wird die Datei holidays.csv aus Ihrem AWS S3-Bucket in Ihren Cloud Storage-Bucket übertragen.

  • Mit GCSToBigQueryOperator wird die Datei holidays.csv aus Cloud Storage in eine neue Tabelle im BigQuery-Dataset holidays_weather aufgenommen, das Sie zuvor erstellt haben.

  • Mit DataprocCreateBatchOperator wird ein PySpark-Batchjob mit Dataproc Serverless erstellt und ausgeführt.

  • Mit BigQueryInsertJobOperator werden die Daten aus holidays.csv in der Spalte „Datum“ mit Wetterdaten aus dem öffentlichen BigQuery-Dataset ghcn_d zusammengeführt. Die BigQueryInsertJobOperator-Aufgaben werden dynamisch mit einer For-Schleife generiert und befinden sich in einer TaskGroup für eine bessere Lesbarkeit in der Grafikansicht der Airflow-Benutzeroberfläche.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

Variablen über die Airflow-Benutzeroberfläche hinzufügen

In Airflow sind Variablen eine universelle Möglichkeit, beliebige Einstellungen oder Konfigurationen als einfaches Schlüssel/Wert-Verzeichnis zu speichern und abzurufen. In diesem DAG werden Airflow-Variablen zum Speichern gängiger Werte verwendet. So fügen Sie sie Ihrer Umgebung hinzu:

  1. Über die Cloud Composer-Konsole auf die Airflow-Benutzeroberfläche zugreifen

  2. Klicken Sie auf Verwaltung > Variablen.

  3. Fügen Sie die folgenden Variablen hinzu:

    • s3_bucket: Der Name des zuvor erstellten S3-Buckets.

    • gcp_project: Ihre Projekt-ID.

    • gcs_bucket: der Name des zuvor erstellten Buckets (ohne das Präfix gs://).

    • gce_region: die Region, in der Sie Ihren Dataproc-Job ausführen möchten, der die Anforderungen an die Dataproc Serverless-Netzwerkkonfiguration erfüllt. Das ist die Region, in der Sie den privaten Google-Zugriff zuvor aktiviert haben.

    • dataproc_service_account: das Dienstkonto für Ihre Cloud Composer-Umgebung. Sie finden dieses Dienstkonto auf dem Tab „Umgebungskonfiguration“ Ihrer Cloud Composer-Umgebung.

DAG in den Bucket Ihrer Umgebung hochladen

Cloud Composer plant DAGs, die sich im Ordner /dags des Buckets Ihrer Umgebung befinden. So laden Sie den DAG mit der Google Cloud Console hoch:

  1. Speichern Sie s3togcsoperator_tutorial.py auf Ihrem lokalen Computer.

  2. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  3. Klicken Sie in der Liste der Umgebungen in der Spalte DAG-Ordner auf den Link DAGs. Der DAGs-Ordner Ihrer Umgebung wird geöffnet.

  4. Klicken Sie auf Dateien hochladen.

  5. Wählen Sie s3togcsoperator_tutorial.py auf Ihrem lokalen Computer aus und klicken Sie auf Öffnen.

DAG auslösen

  1. Klicken Sie in Ihrer Cloud Composer-Umgebung auf den Tab DAGs.

  2. Klicken Sie auf die DAG-ID s3_to_gcs_dag.

  3. Klicken Sie auf DAG auslösen.

  4. Warten Sie etwa fünf bis zehn Minuten, bis ein grünes Häkchen angezeigt wird, das signalisiert, dass die Aufgaben erfolgreich abgeschlossen wurden.

Erfolg des DAG prüfen

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Klicken Sie im Explorer-Bereich auf den Namen Ihres Projekts.

  3. Klicken Sie auf holidays_weather_joined.

  4. Klicken Sie auf „Vorschau“, um die resultierende Tabelle aufzurufen. Die Zahlen in der Spalte „Wert“ sind in Zehntelgrad Celsius angegeben.

  5. Klicken Sie auf holidays_weather_normalized.

  6. Klicken Sie auf „Vorschau“, um die resultierende Tabelle aufzurufen. Die Zahlen in der Spalte „Wert“ sind in Grad Celsius angegeben.

Bereinigen

Löschen Sie die einzelnen Ressourcen, die Sie für diese Anleitung erstellt haben:

Nächste Schritte