Executar um job de contagem de palavras do Hadoop em um cluster do Dataproc

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Neste tutorial, mostramos como usar o Cloud Composer para criar um Gráfico acíclico dirigido (DAG) do Apache Airflow (em inglês) que executa um job de contagem de palavras do Apache Hadoop em um aglomerado.

Objetivos

  1. Acesse seu ambiente do Cloud Composer e use a interface do Airflow.
  2. Crie e visualize variáveis de ambiente do Airflow.
  3. Criar e executar um DAG que inclua as seguintes tarefas:
    1. Cria um Dataproc aglomerado.
    2. Executa um job de contagem de palavras do Apache Hadoop (link em inglês) no cluster.
    3. Gera os resultados da contagem de palavras em um bucket do Cloud Storage.
    4. Exclui o cluster.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

  • Verifique se as seguintes APIs estão ativadas no seu projeto:

    Console

    Enable the Dataproc, Cloud Storage APIs.

    Enable the APIs

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • No seu projeto, criar um bucket do Cloud Storage de qualquer classe e região de armazenamento para armazenar os resultados de contagem de palavras.

  • Observe o caminho do bucket que você criou, por exemplo, gs://example-bucket. Você vai definir uma variável do Airflow para esse caminho e usá-la no DAG de exemplo mais adiante neste tutorial.

  • Crie um ambiente do Cloud Composer com parâmetros padrão. Aguarde até que a criação do ambiente seja concluída. Quando terminar, o uma marca de seleção verde é exibida à esquerda do nome do ambiente.

  • Anote a região onde você criou o ambiente, por exemplo us-central: Você vai definir uma variável do Airflow para essa região e usá-la no DAG de exemplo para executar um cluster do Dataproc na mesma região.

definir variáveis do Airflow

Defina as variáveis do Airflow para usar mais tarde no DAG de exemplo. Por exemplo, é possível definir variáveis do Airflow na interface do Airflow.

Variável do Airflow Valor
gcp_project O ID do projeto que você está usando para este tutorial, como example-project.
gcs_bucket O bucket do URI do Cloud Storage que você criou para este tutorial, como gs://example-bucket..
gce_region A região em que você criou o ambiente, como us-central1. Essa é a região em que o cluster do Dataproc será criado.

Confira o exemplo de fluxo de trabalho

Um DAG do Airflow é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs são definidos em arquivos Python padrão. O código mostrado em hadoop_tutorial.py é o código do fluxo de trabalho.

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}


with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operadores

Para orquestrar as três tarefas no fluxo de trabalho de exemplo, o DAG importa as a seguir, estes três operadores do Airflow:

  • DataprocClusterCreateOperator: cria um cluster do Dataproc.

  • DataProcHadoopOperator: envia um job de contagem de palavras do Hadoop e grava os resultados em um bucket do Cloud Storage.

  • DataprocClusterDeleteOperator: exclui o cluster para evitar incorrer cobranças contínuas do Compute Engine.

Dependências

Você organiza as tarefas que deseja executar de forma a refletir relações e dependências. As tarefas neste DAG são executadas sequencialmente.

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Programação

O nome do DAG é composer_hadoop_tutorial e ele é executado uma vez por dia. Como o start_date transmitido para a default_dag_args é definido como yesterday, o Cloud Composer programa o fluxo de trabalho comece imediatamente após o upload do DAG no bucket do ambiente.

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

faça upload do DAG para o bucket do ambiente

O Cloud Composer armazena DAGs na pasta /dags no bucket do seu ambiente.

Siga estas etapas para fazer o upload do DAG:

  1. Na máquina local, salve hadoop_tutorial.py.

  2. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  3. Na lista de ambientes, na coluna Pasta de DAGs do seu ambiente, clique no link DAGs.

  4. Clique em Enviar arquivos.

  5. Selecione hadoop_tutorial.py na sua máquina local e clique em Abrir.

O Cloud Composer adiciona o DAG ao Airflow e agenda o DAG automaticamente. As alterações no DAG levam de 3 a 5 minutos.

Explorar as execuções do DAG

Conferir o status da tarefa

Quando você faz o upload do arquivo DAG para a pasta dags/ no Cloud Storage, o Cloud Composer analisa o arquivo. Quando concluído, o nome do fluxo de trabalho aparece na lista de DAGs, e o fluxo de trabalho é colocado na fila para ser executado imediatamente.

  1. Para ver o status da tarefa, acesse a interface da Web do Airflow e clique em DAGs na barra de ferramentas.

  2. Para abrir a página de detalhes do DAG, clique em composer_hadoop_tutorial. Isso inclui uma representação gráfica das tarefas do fluxo de trabalho e dependências.

  3. Para ver o status de cada tarefa, clique em Visualização de gráfico e, em seguida, passe o mouse sobre a imagem de cada tarefa.

Enfileirar o fluxo de trabalho novamente

Para executar o fluxo de trabalho novamente a partir da Visualização de gráfico:

  1. Na visualização de gráfico da IU do Airflow, clique no gráfico create_dataproc_cluster.
  2. Para redefinir as três tarefas, clique em Limpar e em OK para confirmar.
  3. Clique em create_dataproc_cluster novamente na visualização de gráfico.
  4. Para enfileirar o fluxo de trabalho novamente, clique em Executar.

Conferir resultados de tarefas

Também é possível verificar o status e os resultados do composer_hadoop_tutorial. do BigQuery acessando as seguintes páginas do console do Google Cloud:

  • Clusters do Dataproc: para monitorar a criação e exclusão de clusters. O cluster criado pelo fluxo de trabalho é temporário: ele existe apenas durante o fluxo de trabalho e é excluído como parte do a última tarefa do fluxo de trabalho.

    Acesse Clusters do Dataproc

  • Jobs do Dataproc: para visualizar ou monitorar o job de contagem de palavras do Apache Hadoop. Clique no ID do job para ver a saída do registro dele.

    Acessar Jobs do Dataproc

  • Navegador do Cloud Storage: para ver os resultados da contagem de palavras no a pasta wordcount no bucket do Cloud Storage que você criou para este tutorial.

    Acesse Navegador do Cloud Storage

Limpeza

Exclua os recursos usados neste tutorial:

  1. Exclua o ambiente do Cloud Composer, incluindo a exclusão manual do bucket do ambiente.

  2. Exclua o bucket do Cloud Storage que armazena os resultados do job de contagem de palavras do Hadoop.