Execute um DAG de análise de dados no Google Cloud usando dados da AWS

Cloud Composer 1 | Cloud Composer 2

Este tutorial é uma modificação do tópico Executar um DAG de análise de dados no Google Cloud que mostra como conectar seu ambiente do Cloud Composer ao Amazon Web Services para utilizar os dados armazenados nele. Ele mostra como usar o Cloud Composer para criar um DAG do Apache Airflow. O DAG mescla dados de um conjunto de dados público do BigQuery e um arquivo CSV armazenado em um bucket do Amazon Web Services (AWS) S3 e, em seguida, executa um job em lote do Dataproc sem servidor para processar os dados mesclados.

O conjunto de dados público do BigQuery neste tutorial é ghcn_d, um banco de dados integrado de resumos climáticos de todo o mundo. O arquivo CSV contém informações sobre as datas e os nomes dos feriados nos EUA de 1997 a 2021.

A pergunta que queremos responder usando o DAG é: "Qual foi o nível de calor em Chicago no Dia de Ação de Graças nos últimos 25 anos?"

Objetivos

  • Criar um ambiente do Cloud Composer na configuração padrão
  • Criar um bucket no AWS S3
  • Criar um conjunto de dados vazio do BigQuery
  • Crie um novo bucket do Cloud Storage
  • Crie e execute um DAG que inclua as seguintes tarefas:
    • Carregar um conjunto de dados externo do S3 para o Cloud Storage
    • Carregar um conjunto de dados externo do Cloud Storage para o BigQuery
    • Mesclar dois conjuntos de dados no BigQuery
    • Executar um job de análise de dados do PySpark

Antes de começar

Gerenciar permissões na AWS

  1. Crie uma conta da AWS.

  2. Siga a seção "Como criar políticas com o editor visual" do tutorial Como criar políticas de IAM da AWS para criar uma política de IAM personalizada para o AWS S3 com a seguinte configuração:

    • Serviço:S3
    • ListAllMyBuckets (s3:ListAllMyBuckets), para visualizar o bucket S3
    • CreateBucket (s3:CreateBucket), para a criação de um bucket
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls), para a criação de um bucket
    • ListBucket (s3:ListBucket), para conceder permissão para listar objetos em um bucket do S3
    • PutObject (s3:PutObject), para fazer upload de arquivos em um bucket
    • GetBucketVersioning (s3:GetBucketVersioning), para excluir um objeto em um bucket
    • DeleteObject (s3:DeleteObject), para excluir um objeto em um bucket
    • ListBucketVersions (s3:ListBucketVersions), para excluir um bucket
    • DeleteBucket (s3:DeleteBucket), para excluir um bucket
    • Recursos:escolha "Qualquer" ao lado de "bucket" e "objeto" para conceder permissões a qualquer recurso desse tipo.
    • Tag:nenhuma
    • Nome:TutorialPolicy

    Consulte a lista de ações compatíveis com o Amazon S3 para mais informações sobre cada configuração encontrada acima.

  3. Adicionar a política de IAM TutorialPolicy à sua identidade

Ativar APIs

Ative as APIs a seguir:

Console

Ative as APIs Dataproc, Cloud Composer, BigQuery, Cloud Storage.

Ative as APIs

gcloud

Ative as APIs Dataproc, Cloud Composer, BigQuery, Cloud Storage:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Conceder permissões

Conceda os seguintes papéis e permissões à sua conta de usuário:

Criar e preparar o ambiente do Cloud Composer

  1. Crie um ambiente do Cloud Composer com parâmetros padrão:

  2. Conceda os papéis a seguir à conta de serviço usada no ambiente do Cloud Composer para que os workers do Airflow possam executar as tarefas do DAG:

    • Usuário do BigQuery (roles/bigquery.user)
    • Proprietário de dados do BigQuery (roles/bigquery.dataOwner)
    • Usuário da conta de serviço (roles/iam.serviceAccountUser)
    • Editor do Dataproc (roles/dataproc.editor)
    • Worker do Dataproc (roles/dataproc.worker)
  1. Instale o apache-airflow-providers-amazon pacote PyPI no ambiente do Cloud Composer.

  2. Crie um conjunto de dados vazio do BigQuery com os seguintes parâmetros:

    • Name: holiday_weather
    • Região: US
  3. Crie um novo bucket do Cloud Storage na multirregião US.

  4. Execute o comando a seguir para ativar o acesso privado do Google na sub-rede padrão na região onde você gostaria de executar o Dataproc sem servidor para atender aos requisitos de rede. Recomendamos usar a mesma região do seu ambiente do Cloud Composer.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Crie um bucket do S3 com configurações padrão na região de sua preferência.

Conectar-se à AWS pelo Cloud Composer

  1. Consiga o ID da chave de acesso da AWS e a chave de acesso secreta
  2. Adicione a conexão do AWS S3 usando a IU do Airflow:

    1. Acesse Administrador > Conexões.
    2. Crie uma nova conexão com a seguinte configuração:

      • ID da conexão: aws_s3_connection
      • Tipo de conexão: Amazon S3
      • Extras: {"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Processamento de dados usando Dataproc sem servidor

Analise o job de exemplo do PySpark

O código mostrado abaixo é um exemplo de job do PySpark que converte a temperatura de décimos de grau em Celsius para graus Celsius. Esse job converte dados de temperatura do conjunto de dados em um formato diferente.

import sys

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

faça upload do arquivo do PySpark para o Cloud Storage

Para fazer o upload do arquivo PySpark para o Cloud Storage:

  1. Salve o data_analytics_process.py na sua máquina local.

  2. No console do Google Cloud, acesse a página Navegador do Cloud Storage:

    Acessar o navegador do Cloud Storage

  3. Clique no nome do bucket criado anteriormente.

  4. Na guia Objetos do bucket, clique no botão Fazer upload de arquivos, selecione data_analytics_process.py na caixa de diálogo exibida e clique em Abrir.

Faça upload do arquivo CSV para o AWS S3

Para fazer upload do arquivo holidays.csv:

  1. Economize holidays.csv na sua máquina local.
  2. Siga o guia da AWS para fazer upload do arquivo no bucket.

DAG de análise de dados

Analise o DAG de exemplo

O DAG usa vários operadores para transformar e unificar os dados:

  • O S3ToGCSOperator transfere o arquivo holidays.csv do bucket do AWS S3 para o bucket do Cloud Storage.

  • O GCSToBigQueryOperator ingere o arquivo holidays.csv do Cloud Storage para uma nova tabela no conjunto de dados holidays_weather do BigQuery criado anteriormente.

  • O DataprocCreateBatchOperator cria e executa um job em lote do PySpark usando o Dataproc sem servidor.

  • O BigQueryInsertJobOperator mescla os dados de holidays.csv na coluna "Date" com os dados meteorológicos do conjunto de dados público do BigQuery ghcn_d. As tarefas BigQueryInsertJobOperator são geradas dinamicamente usando um loop "for" e estão em um TaskGroup para facilitar a leitura na visualização de gráfico da interface do Airflow.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

# S3 configs
S3_BUCKET_NAME = "{{var.value.s3_bucket}}"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "pyspark_batch": {
        "jar_file_uris": [PYSPARK_JAR],
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "s3_to_gcs_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://{BUCKET_NAME}",
    )

    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table
            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # For demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        s3_to_gcs_op >> load_external_dataset >> bq_join_group >> create_batch

Usar a interface do Airflow para adicionar variáveis

No Airflow, as variáveis são uma maneira universal de armazenar e recuperar configurações ou configurações arbitrárias como um armazenamento simples de chave-valor. Esse DAG usa variáveis do Airflow para armazenar valores comuns. Para adicioná-los ao seu ambiente:

  1. Acesse a interface do Airflow pelo console do Cloud Composer.

  2. Acesse Administrador > Variáveis.

  3. Adicione as seguintes variáveis:

    • s3_bucket: o nome do bucket S3 que você criou anteriormente.

    • gcp_project: o ID do projeto.

    • gcs_bucket: o nome do bucket que você criou anteriormente (sem o prefixo gs://).

    • gce_region: a região em que você quer que o job do Dataproc atenda aos requisitos de rede sem servidor do Dataproc. Esta é a região onde você ativou o Acesso privado do Google anteriormente.

    • dataproc_service_account: a conta de serviço do ambiente do Cloud Composer. Essa conta de serviço está na guia de configuração do ambiente do Cloud Composer.

faça upload do DAG para o bucket do ambiente

O Cloud Composer programa os DAGs localizados na pasta /dags no bucket do ambiente. Para fazer o upload do DAG usando o console do Google Cloud:

  1. Na máquina local, salve s3togcsoperator_tutorial.py.

  2. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  3. Na lista de ambientes, na coluna pasta do DAG, clique no link DAGs. A pasta de DAGs do seu ambiente será aberta.

  4. Clique em Fazer o upload dos arquivos.

  5. Selecione s3togcsoperator_tutorial.py na máquina local e clique em Abrir.

Acione o DAG

  1. No seu ambiente do Cloud Composer, clique na guia DAGs.

  2. Clique no ID do DAG s3_to_gcs_dag.

  3. Clique em Acionar DAG.

  4. Aguarde cerca de cinco a dez minutos até ver uma marca de seleção verde indicando que as tarefas foram concluídas com êxito.

Validar o sucesso do DAG

  1. No console do Google Cloud, acesse a página do BigQuery.

    Acessar o BigQuery

  2. No painel Explorer, clique no nome do projeto.

  3. Clique em holidays_weather_joined.

  4. Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na coluna de valor estão em décimos de grau Celsius.

  5. Clique em holidays_weather_normalized.

  6. Clique em "Visualizar" para conferir a tabela resultante. Observe que os números na coluna de valor estão em graus Celsius.

limpeza

Exclua os recursos individuais criados para este tutorial:

A seguir