Execute um DAG de estatísticas de dados no Google Cloud com dados da AWS

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Este tutorial é uma modificação do artigo Execute um DAG de estatísticas de dados no Google Cloud que mostra como associar o seu ambiente do Cloud Composer aos serviços Web da Amazon para usar os dados aí armazenados. Mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG junta dados de um conjunto de dados público do BigQuery e um ficheiro CSV armazenado num contentor do Amazon Web Services (AWS) S3 e, em seguida, executa uma tarefa em lote do Dataproc Serverless para processar os dados juntos.

O conjunto de dados públicos do BigQuery neste tutorial é o ghcn_d, uma base de dados integrada de resumos climáticos em todo o mundo. O ficheiro CSV contém informações sobre as datas e os nomes dos feriados dos EUA de 1997 a 2021.

A pergunta à qual queremos responder usando o DAG é: "Qual foi a temperatura em Chicago no Dia de Ação de Graças nos últimos 25 anos?"

Objetivos

  • Crie um ambiente do Cloud Composer na configuração predefinida
  • Crie um contentor no AWS S3
  • Crie um conjunto de dados do BigQuery vazio
  • Crie um novo contentor do Cloud Storage
  • Crie e execute um DAG que inclua as seguintes tarefas:
    • Carregue um conjunto de dados externo do S3 para o Cloud Storage
    • Carregue um conjunto de dados externo do Cloud Storage para o BigQuery
    • Junte dois conjuntos de dados no BigQuery
    • Execute uma tarefa PySpark de análise de dados

Antes de começar

Faça a gestão das autorizações na AWS

  1. Crie uma conta da AWS.

  2. Siga a "Secção de criação de políticas com o editor visual" do Tutorial da AWS sobre a criação de políticas de IAM para criar uma política de IAM personalizada para o AWS S3 com a seguinte configuração:

    • Serviço: S3
    • ListAllMyBuckets (s3:ListAllMyBuckets), para ver o seu contentor do S3
    • CreateBucket (s3:CreateBucket), para criar um contentor
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls), para criar um contentor
    • ListBucket (s3:ListBucket), para conceder autorização para listar objetos num contentor do S3
    • PutObject (s3:PutObject), para carregar ficheiros para um contentor
    • GetBucketVersioning (s3:GetBucketVersioning), para eliminar um objeto num contentor
    • DeleteObject (s3:DeleteObject), para eliminar um objeto num contentor
    • ListBucketVersions (s3:ListBucketVersions), para eliminar um contentor
    • DeleteBucket (s3:DeleteBucket), para eliminar um contentor
    • Recursos: escolha "Qualquer" junto a "contentor" e "objeto" para conceder autorizações a quaisquer recursos desse tipo.
    • Etiqueta: nenhuma
    • Nome: TutorialPolicy

    Consulte a lista de ações suportadas no Amazon S3 para mais informações acerca de cada configuração apresentada acima.

  3. Adicione a política de IAM TutorialPolicy à sua identidade

Ativar APIs

Ative as seguintes APIs:

Consola

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Conceder autorizações

Conceda as seguintes funções e autorizações à sua conta de utilizador:

Crie e prepare o seu ambiente do Cloud Composer

  1. Crie um ambiente do Cloud Composer com os parâmetros predefinidos:

  2. Conceda as seguintes funções à conta de serviço usada no seu ambiente do Cloud Composer para que os trabalhadores do Airflow executem com êxito as tarefas DAG:

    • Utilizador do BigQuery (roles/bigquery.user)
    • Proprietário dos dados do BigQuery (roles/bigquery.dataOwner)
    • Utilizador da conta de serviço (roles/iam.serviceAccountUser)
    • Editor do Dataproc (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. Instale o apache-airflow-providers-amazon pacote PyPI no seu ambiente do Cloud Composer.

  2. Crie um conjunto de dados do BigQuery vazio com os seguintes parâmetros:

    • Nome: holiday_weather
    • Região: US
  3. Crie um novo contentor do Cloud Storage na multirregião US.

  4. Execute o seguinte comando para ativar o acesso privado à Google na sub-rede predefinida na região onde quer executar o Dataproc Serverless para cumprir os requisitos de rede. Recomendamos que use a mesma região que o seu ambiente do Cloud Composer.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Crie um contentor S3 com as predefinições na sua região preferida.

Estabeleça ligação à AWS a partir do Cloud Composer

  1. Obtenha o ID da chave de acesso e a chave de acesso secreta da AWS
  2. Adicione a ligação do AWS S3 através da IU do Airflow:

    1. Aceda a Administração > Ligações.
    2. Crie uma nova associação com a seguinte configuração:

      • ID da associação: aws_s3_connection
      • Tipo de ligação: Amazon S3
      • Extras: {"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Tratamento de dados com o Dataproc Serverless

Explore o exemplo de tarefa do PySpark

O código apresentado abaixo é um exemplo de uma tarefa do PySpark que converte a temperatura de décimas de grau Celsius para graus Celsius. Esta tarefa converte os dados de temperatura do conjunto de dados num formato diferente.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Carregue o ficheiro PySpark para o Cloud Storage

Para carregar o ficheiro PySpark para o Cloud Storage:

  1. Guarde o ficheiro data_analytics_process.py na sua máquina local.

  2. Na Google Cloud consola, aceda à página do navegador do Cloud Storage:

    Aceda ao navegador do Cloud Storage

  3. Clique no nome do contentor que criou anteriormente.

  4. No separador Objetos do contentor, clique no botão Carregar ficheiros, selecione data_analytics_process.py na caixa de diálogo apresentada e clique em Abrir.

Carregue o ficheiro CSV para o AWS S3

Para carregar o ficheiro holidays.csv:

  1. Guarde holidays.csv na sua máquina local.
  2. Siga o guia da AWS para carregar o ficheiro para o seu contentor.

DAG de análise de dados

Explore o DAG de exemplo

O DAG usa vários operadores para transformar e unificar os dados:

  • O comando S3ToGCSOperator transfere o ficheiro holidays.csv do seu contentor do AWS S3 para o seu contentor do Cloud Storage.

  • O comando GCSToBigQueryOperator carrega o ficheiro holidays.csv do Cloud Storage para uma nova tabela no conjunto de dados holidays_weather do BigQuery que criou anteriormente.

  • O comando DataprocCreateBatchOperator cria e executa uma tarefa em lote do PySpark através do Dataproc sem servidor.

  • O comando BigQueryInsertJobOperator junta os dados de holidays.csv na coluna "Date" com os dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefas BigQueryInsertJobOperator são geradas dinamicamente através de um ciclo for, e estas tarefas estão num TaskGroup para uma melhor legibilidade na vista de gráfico da IU do Airflow.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

Use a IU do Airflow para adicionar variáveis

No Airflow, as variáveis são uma forma universal de armazenar e obter definições ou configurações arbitrárias como um simples armazenamento de valores-chave. Este DAG usa variáveis do Airflow para armazenar valores comuns. Para as adicionar ao seu ambiente:

  1. Aceda à IU do Airflow a partir da consola do Cloud Composer.

  2. Aceda a Administração > Variáveis.

  3. Adicione as seguintes variáveis:

    • s3_bucket: o nome do contentor do S3 que criou anteriormente.

    • gcp_project: o ID do seu projeto.

    • gcs_bucket: o nome do contentor que criou anteriormente (sem o prefixo gs://).

    • gce_region: a região onde quer que a sua tarefa do Dataproc cumpra os requisitos de rede do Dataproc Serverless. Esta é a região onde ativou o acesso privado à Google anteriormente.

    • dataproc_service_account: a conta de serviço do seu ambiente do Cloud Composer. Pode encontrar esta conta de serviço no separador de configuração do ambiente do seu ambiente do Cloud Composer.

Carregue o DAG para o contentor do seu ambiente

O Cloud Composer agenda DAGs localizados na pasta /dags no contentor do seu ambiente. Para carregar o DAG através da Google Cloud consola:

  1. Na sua máquina local, guarde o ficheiro s3togcsoperator_tutorial.py.

  2. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  3. Na lista de ambientes, na coluna Pasta DAG, clique no link DAGs. A pasta DAGs do seu ambiente é aberta.

  4. Clique em Carregar ficheiros.

  5. Selecione s3togcsoperator_tutorial.py no seu computador local e clique em Abrir.

Acione o DAG

  1. No seu ambiente do Cloud Composer, clique no separador DAGs.

  2. Clique no ID do DAG s3_to_gcs_dag.

  3. Clique em Acionar DAG.

  4. Aguarde cerca de cinco a dez minutos até ver uma marca de verificação verde a indicar que as tarefas foram concluídas com êxito.

Valide o êxito do DAG

  1. Na Google Cloud consola, aceda à página BigQuery.

    Aceda ao BigQuery

  2. No painel Explorador, clique no nome do projeto.

  3. Clique em holidays_weather_joined.

  4. Clique em pré-visualizar para ver a tabela resultante. Tenha em atenção que os números na coluna de valor estão em décimos de grau Celsius.

  5. Clique em holidays_weather_normalized.

  6. Clique em pré-visualizar para ver a tabela resultante. Tenha em atenção que os números na coluna de valor estão em graus Celsius.

Limpeza

Elimine os recursos individuais que criou para este tutorial:

O que se segue?