Execute uma tarefa de contagem de palavras do Hadoop num cluster do Dataproc

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Este tutorial mostra como usar o Cloud Composer para criar um DAG (gráfico acíclico direcionado) do Apache Airflow que executa uma tarefa de contagem de palavras do Apache Hadoop num cluster do Dataproc.

Objetivos

  1. Aceda ao seu ambiente do Cloud Composer e use a IU do Airflow.
  2. Crie e veja variáveis de ambiente do Airflow.
  3. Crie e execute um DAG que inclua as seguintes tarefas:
    1. Cria um cluster do Dataproc.
    2. Executa uma tarefa de contagem de palavras do Apache Hadoop no cluster.
    3. Envia os resultados da contagem de palavras para um contentor do Cloud Storage.
    4. Elimina o cluster.

Custos

Neste documento, usa os seguintes componentes faturáveis do Google Cloud:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.

Os novos Google Cloud utilizadores podem ser elegíveis para uma avaliação gratuita.

Antes de começar

  • Certifique-se de que as seguintes APIs estão ativadas no seu projeto:

    Consola

    Enable the Dataproc, Cloud Storage APIs.

    Enable the APIs

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • No seu projeto, crie um contentor do Cloud Storage de qualquer classe de armazenamento e região para armazenar os resultados da tarefa de contagem de palavras do Hadoop.

  • Tome nota do caminho do contentor que criou, por exemplo, gs://example-bucket. Vai definir uma variável do Airflow para este caminho e usar a variável no DAG de exemplo mais tarde neste tutorial.

  • Crie um ambiente do Cloud Composer com parâmetros predefinidos. Aguarde até que a criação do ambiente esteja concluída. Quando terminar, a marca de verificação verde é apresentada à esquerda do nome do ambiente.

  • Tenha em atenção a região onde criou o seu ambiente, por exemplo, us-central. Vai definir uma variável do Airflow para esta região e usá-la no DAG de exemplo para executar um cluster do Dataproc na mesma região.

Defina variáveis do Airflow

Defina as variáveis do Airflow para usar mais tarde no DAG de exemplo. Por exemplo, pode definir variáveis do Airflow na IU do Airflow.

Variável de fluxo de ar Valor
gcp_project O ID do projeto que está a usar para este tutorial, como example-project.
gcs_bucket O contentor do URI do Cloud Storage que criou para este tutorial, como gs://example-bucket.
gce_region A região onde criou o seu ambiente, como us-central1. Esta é a região onde o cluster do Dataproc vai ser criado.

Veja o fluxo de trabalho de exemplo

Um DAG do Airflow é uma coleção de tarefas organizadas que quer agendar e executar. Os DAGs são definidos em ficheiros Python padrão. O código apresentado em hadoop_tutorial.py é o código do fluxo de trabalho.

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}


with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operadores

Para orquestrar as três tarefas no fluxo de trabalho de exemplo, o DAG importa os seguintes três operadores do Airflow:

  • DataprocClusterCreateOperator: cria um cluster do Dataproc.

  • DataProcHadoopOperator: envia uma tarefa de contagem de palavras do Hadoop e escreve os resultados num contentor do Cloud Storage.

  • DataprocClusterDeleteOperator: elimina o cluster para evitar incorrer em cobranças contínuas do Compute Engine.

Dependências

Organiza as tarefas que quer executar de uma forma que reflita as respetivas relações e dependências. As tarefas neste DAG são executadas sequencialmente.

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Agendamento

O nome do DAG é composer_hadoop_tutorial e o DAG é executado uma vez por dia. Uma vez que o start_date transmitido para default_dag_args está definido como yesterday, o Cloud Composer agenda o fluxo de trabalho para começar imediatamente após o DAG ser carregado para o contentor do ambiente.

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Carregue o DAG para o contentor do ambiente

O Cloud Composer armazena DAGs na pasta /dags no contentor do seu ambiente.

Para carregar o DAG:

  1. Na sua máquina local, guarde hadoop_tutorial.py.

  2. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  3. Na lista de ambientes, na coluna Pasta DAGs do seu ambiente, clique no link DAGs.

  4. Clique em Carregar ficheiros.

  5. Selecione hadoop_tutorial.py no seu computador local e clique em Abrir.

O Cloud Composer adiciona o DAG ao Airflow e agenda o DAG automaticamente. As alterações ao DAG ocorrem no prazo de 3 a 5 minutos.

Explore execuções de DAGs

Veja o estado da tarefa

Quando carrega o ficheiro DAG para a pasta dags/ no Cloud Storage, o Cloud Composer analisa o ficheiro. Quando concluído com êxito, o nome do fluxo de trabalho aparece na lista de DAGs e o fluxo de trabalho é colocado em fila para ser executado imediatamente.

  1. Para ver o estado da tarefa, aceda à interface Web do Airflow e clique em DAGs na barra de ferramentas.

  2. Para abrir a página de detalhes do DAG, clique em composer_hadoop_tutorial. Esta página inclui uma representação gráfica das tarefas e das dependências do fluxo de trabalho.

  3. Para ver o estado de cada tarefa, clique em Vista de gráfico e, de seguida, passe o cursor do rato sobre o gráfico de cada tarefa.

Coloque o fluxo de trabalho novamente na fila

Para executar novamente o fluxo de trabalho a partir da vista de gráfico:

  1. Na vista de gráfico da IU do Airflow, clique no gráfico create_dataproc_cluster.
  2. Para repor as três tarefas, clique em Limpar e, de seguida, em OK para confirmar.
  3. Clique novamente em create_dataproc_cluster na vista de gráfico.
  4. Para colocar o fluxo de trabalho novamente na fila, clique em Executar.

Veja os resultados das tarefas

Também pode verificar o estado e os resultados do composer_hadoop_tutorialfluxo de trabalho acedendo às seguintes páginas da Google Cloud consola:

  • Clusters do Dataproc: para monitorizar a criação e a eliminação de clusters. Tenha em atenção que o cluster criado pelo fluxo de trabalho é efémero: só existe durante o fluxo de trabalho e é eliminado como parte da última tarefa do fluxo de trabalho.

    Aceda aos clusters do Dataproc

  • Tarefas do Dataproc: para ver ou monitorizar a tarefa de contagem de palavras do Apache Hadoop. Clique no ID da tarefa para ver o resultado do registo de tarefas.

    Aceda a Tarefas do Dataproc

  • Navegador do Cloud Storage: para ver os resultados da contagem de palavras na pasta wordcount no contentor do Cloud Storage que criou para este tutorial.

    Aceda ao navegador do Cloud Storage

Limpeza

Elimine os recursos usados neste tutorial:

  1. Elimine o ambiente do Cloud Composer, incluindo a eliminação manual do contentor do ambiente.

  2. Elimine o contentor do Cloud Storage que armazena os resultados da tarefa de contagem de palavras do Hadoop.