Dataproc 클러스터에서 Hadoop 워드카운트 작업 실행

Cloud Composer 1 | Cloud Composer 2

이 튜토리얼에서는 Cloud Composer를 사용하여 Dataproc 클러스터에서 Apache Hadoop 워드카운트 작업을 실행하는 Apache Airflow DAG(방향성 비순환 그래프)를 만드는 방법을 보여줍니다.

목표

  1. Cloud Composer 환경에 액세스하여 Airflow UI를 사용하기
  2. Airflow 환경 변수를 만들고 열람하기
  3. 다음 작업이 포함된 DAG를 만들고 실행하기
    1. Dataproc 클러스터를 만듭니다.
    2. 클러스터에서 Apache Hadoop 워드카운트 작업을 실행합니다.
    3. 워드카운트 결과를 Cloud Storage 버킷에 출력합니다.
    4. 클러스터를 삭제합니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Cloud Composer
  • Dataproc
  • Cloud Storage

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

  • 프로젝트에 다음 API가 사용 설정되어 있는지 확인합니다.

    콘솔

    API Dataproc, Cloud Storage 사용 설정

    API 사용 설정

    gcloud

    Dataproc, Cloud Storage API를 사용 설정합니다.

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • 프로젝트에서 스토리지 클래스와 리전에 상관없이 Hadoop 워드카운트 작업 결과를 저장할 Cloud Storage 버킷을 만듭니다.

  • 만든 버킷의 경로를 기록합니다(예: gs://example-bucket). 이 경로에 Airflow 변수를 정의하고 이 튜토리얼 후반부의 DAG 예시에서 변수를 사용합니다.

  • 기본 매개변수로 Cloud Composer 환경을 만듭니다. 환경 생성이 완료될 때까지 기다립니다. 완료되면 환경 이름 왼쪽에 녹색 체크표시가 표시됩니다.

  • 환경을 만든 리전을 기록합니다(예: us-central). 이 리전의 Airflow 변수를 정의하고 예시 DAG에서 사용하여 같은 리전에서 Dataproc 클러스터를 실행합니다.

Airflow 변수 설정

나중에 예시 DAG에서 사용할 Airflow 변수를 설정합니다. 예를 들어 Airflow UI에서 Airflow 변수를 설정할 수 있습니다.

Airflow 변수
gcp_project 이 튜토리얼에서 사용하는 프로젝트의 프로젝트 ID입니다(예: example-project).
gcs_bucket 이 튜토리얼용으로 만든 URI Cloud Storage 버킷입니다(예: gs://example-bucket.).
gce_region 환경을 만든 리전입니다(예: us-central1). 이 리전은 Dataproc 클러스터를 만들 리전입니다.

예시 워크플로 보기

Airflow DAG는 예약 및 실행하려는 태스크가 구성된 모음입니다. DAG는 표준 Python 파일에서 정의됩니다. hadoop_tutorial.py에 나와 있는 코드는 워크플로 코드입니다.

Airflow 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

연산자

예시 워크플로에서 태스크 3개를 조정하기 위해 DAG는 다음 Airflow 연산자 3개를 가져옵니다.

  • DataprocClusterCreateOperator: Dataproc 클러스터를 만듭니다.

  • DataProcHadoopOperator: Hadoop 워드카운트 작업을 제출하고 결과를 Cloud Storage 버킷에 기록합니다.

  • DataprocClusterDeleteOperator: Compute Engine 요금이 청구되지 않도록 클러스터를 삭제합니다.

종속 항목

실행할 태스크를 관계 및 종속 항목을 반영하는 방식으로 구성합니다. 이 DAG의 태스크는 순차적으로 실행됩니다.

Airflow 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Airflow 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

예약

DAG 이름은 composer_hadoop_tutorial이며 DAG는 매일 1회 실행됩니다. default_dag_args로 전달된 start_dateyesterday로 설정되어 있으므로 Cloud Composer는 DAG가 환경 버킷에 업로드된 직후에 워크플로가 시작되도록 예약합니다.

Airflow 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

환경 버킷에 DAG 업로드

Cloud Composer는 환경 버킷의 /dags 폴더에 DAG를 저장합니다.

DAG를 업로드하려면 다음 안내를 따르세요.

  1. 로컬 머신에서 hadoop_tutorial.py를 저장합니다.

  2. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  3. 환경 목록에 있는 환경의 DAG 폴더 열에서 DAG 링크를 클릭합니다.

  4. 파일 업로드를 클릭합니다.

  5. 로컬 머신에서 hadoop_tutorial.py를 선택하고 열기를 클릭합니다.

Cloud Composer는 DAG를 Airflow에 추가하고 DAG를 자동으로 예약합니다. DAG 변경사항은 3~5분 이내에 적용됩니다.

DAG 실행 살펴보기

태스크 상태 보기

DAG 파일을 Cloud Storage의 dags/ 폴더에 업로드하면 Cloud Composer가 파일을 파싱합니다. 작업이 성공적으로 완료되면 워크플로 이름이 DAG 목록에 나타나고 워크플로가 즉시 실행되도록 큐에 추가됩니다.

  1. 태스크 상태를 보려면 Airflow 웹 인터페이스로 이동하고 툴바에서 DAGs를 클릭합니다.

  2. DAG 세부정보 페이지를 열려면 composer_hadoop_tutorial을 클릭합니다. 이 페이지에서는 워크플로 태스크와 종속 항목이 그래픽으로 표시됩니다.

  3. 각 태스크의 상태를 확인하려면 그래프 뷰를 클릭한 후 각 태스크의 그래픽에 마우스를 가져갑니다.

워크플로를 다시 큐에 추가

그래프 뷰에서 워크플로를 다시 실행하려면 다음 안내를 따르세요.

  1. Airflow UI 그래프 뷰에서 create_dataproc_cluster 그래픽을 클릭합니다.
  2. 태스크 세 개를 재설정하려면 삭제를 클릭한 후 확인을 클릭하여 확인합니다.
  3. 그래프 뷰에서 create_dataproc_cluster를 다시 클릭합니다.
  4. 워크플로를 다시 큐에 추가하려면 실행을 클릭합니다.

태스크 결과 보기

다음 Google Cloud 콘솔 페이지로 이동하여 composer_hadoop_tutorial 워크플로의 상태와 결과를 확인할 수도 있습니다.

  • Dataproc 클러스터: 클러스터 생성 및 삭제를 모니터링합니다. 워크플로에서 만든 클러스터는 수명이 짧습니다. 즉, 워크플로 지속시간 동안에만 존재하며 마지막 워크플로 태스크의 일부로 삭제됩니다.

    Dataproc 클러스터로 이동

  • Dataproc 작업: Apache Hadoop 워드카운트 작업을 보거나 모니터링합니다. 작업 ID를 클릭하여 작업 로그 출력을 확인합니다.

    Dataproc 작업으로 이동

  • Cloud Storage 브라우저: 이 튜토리얼에서 만든 Cloud Storage 버킷의 wordcount 폴더에서 워드카운트 결과를 확인합니다.

    Cloud Storage 브라우저로 이동

삭제

이 튜토리얼에서 사용된 리소스를 삭제합니다.

  1. 환경 버킷을 수동으로 삭제하는 등 Cloud Composer 환경을 삭제합니다.

  2. Hadoop 워드카운트 작업 결과를 저장하는 Cloud Storage 버킷을 삭제합니다.