Dataproc 클러스터에서 Hadoop 워드카운트 작업 실행

이 가이드에서는 Google Cloud Console을 통해 Cloud Composer를 사용하여 Dataproc 클러스터에서 Apache Hadoop 워드카운트 작업을 실행하는 Apache Airflow DAG(워크플로)를 만드는 방법을 설명합니다.

목표

  1. Cloud Composer 환경에 액세스하고 Airflow 웹 인터페이스를 사용합니다.
  2. Airflow 환경 변수를 만들고 봅니다.
  3. 다음 작업이 포함된 DAG를 만들고 실행합니다.
    1. Dataproc 클러스터를 만듭니다.
    2. 클러스터에서 Apache Hadoop 워드카운트 작업을 실행합니다.
    3. 워드카운트 결과를 Cloud Storage 버킷에 출력합니다.
    4. 클러스터를 삭제합니다.

비용

이 가이드에서는 다음과 같은 비용이 청구될 수 있는 Google Cloud 구성요소를 사용합니다.

  • Cloud Composer
  • Dataproc
  • Cloud Storage

시스템에서 개발자 환경을 만드는 데 최대 25분이 걸립니다. 이 가이드를 완료하는 데 약 1시간 정도 걸릴 수 있습니다. 가격 계산기를 사용하여 예상 사용량을 기준으로 예상 비용을 산출합니다. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

  1. Google 계정으로 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. Cloud Console의 프로젝트 선택기 페이지에서 Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기 페이지로 이동

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Cloud Composer, Cloud Dataproc, and Cloud Storage API를 사용 설정합니다.

    API 사용 설정

  5. 프로젝트에서 스토리지 클래스와 리전에 상관없이 Hadoop 워드카운트 작업 결과를 저장할 Cloud Storage 버킷을 만듭니다.
  6. 만든 버킷의 경로를 기록합니다(예: gs://my-bucket). 이 경로에 Airflow 변수를 정의하여 예시 DAG에서 사용합니다.

환경 만들기

  1. Cloud Console에서 환경 만들기 페이지로 이동합니다.

    환경 만들기 페이지 열기

  2. 이름 필드에 example-environment를 입력합니다.

  3. 위치 드롭다운 목록에서 Cloud Composer 환경의 리전을 선택합니다. 리전 선택에 대한 자세한 내용은 사용 가능한 리전을 참조하세요.

  4. 다른 환경 구성 옵션에 제공된 기본값을 사용합니다.

  5. 환경을 만들려면 만들기를 클릭합니다.

  6. 환경 생성이 완료될 때까지 기다립니다. 작업이 완료되면 환경 이름의 왼쪽에 녹색 체크표시가 표시됩니다.

환경 세부정보 보기

환경 생성이 완료되면 Cloud Composer 및 Python 버전, Airflow 웹 인터페이스의 URL, Google Kubernetes Engine 클러스터 ID와 같은 환경의 배포 정보를 볼 수 있습니다.

배포 정보를 보려면 다음 안내를 따르세요.

  1. Cloud Console에서 환경 페이지로 이동합니다.

    환경 페이지 열기

  2. 환경 세부정보 페이지를 보려면 example-environment를 클릭합니다.

  3. 환경을 만든 영역을 기록합니다(예: us-central-1c). 이 영역에 Airflow 변수를 정의하여 예시 DAG에서 사용합니다.

Airflow 변수 설정

Airflow 변수환경 변수와 다른 Airflow 관련 개념입니다. 이 단계에서는 Airflow 웹 인터페이스를 통해 나중에 예시 DAG에서 사용할 Airflow 변수 세 개를 설정합니다.

변수를 설정하려면 다음 안내를 따르세요.

  1. Cloud Console에서 Airflow 웹 인터페이스에 액세스합니다.

    1. Cloud Console에서 환경 페이지로 이동합니다.

      환경 페이지 열기

    2. example-environmentAirflow 웹 서버 열에서 Airflow 링크를 클릭합니다. Airflow 웹 인터페이스가 새 창에서 열립니다.

  2. Airflow 웹 인터페이스에서 변수를 설정합니다.

    1. 툴바에서 관리 > 변수를 클릭합니다.
    2. 새 변수를 만들려면 만들기를 클릭합니다.
    3. 다음 변수 각각에 키-값 쌍을 입력하고 저장을 클릭합니다. 모든 Airflow 변수가 목록 탭에 표시됩니다.
      gcp_project 이 가이드를 위해 사용 중인 composer-test와 같은 Google Cloud Platform 프로젝트의 프로젝트 ID입니다.
      gcs_bucket 이 가이드용으로 만든 Cloud Storage 버킷(예: gs://my-bucket)
      gce_zone 환경에 사용할 Compute Engine 영역(예: us-central1-c). Dataproc 클러스터를 만들 영역입니다. 사용 가능한 리전 및 영역을 참조하세요.

예시 워크플로 보기

Airflow DAG는 예약 및 실행하려는 태스크가 구성된 모음입니다. DAG는 표준 Python 파일에서 정의됩니다. hadoop_tutorial.py에 나와 있는 코드는 워크플로 코드입니다.

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

연산자

연산자는 워크플로의 단일 태스크용 템플릿입니다. 예시 워크플로에서 태스크 세 개를 조정하기 위해 DAG는 다음 연산자 세 개를 가져옵니다.

  1. DataprocClusterCreateOperator: Dataproc 클러스터를 만듭니다.
  2. DataProcHadoopOperator: Hadoop 워드카운트 작업을 제출하고 결과를 Cloud Storage 버킷에 기록합니다.
  3. DataprocClusterDeleteOperator: Compute Engine 요금이 청구되지 않도록 클러스터를 삭제합니다.

종속 항목

실행할 태스크를 관계 및 종속 항목을 반영하는 방식으로 구성합니다. 이 DAG의 태스크는 순차적으로 실행됩니다. 이 예시에서 관계는 Python bitshift 연산자가 가리키는 방향으로 설정됩니다(>>).

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

예약

DAG 이름은 composer_hadoop_tutorial이며 매일 1회 실행됩니다. default_dag_args로 전달된 start_dateyesterday로 설정되어 있으므로 Cloud Composer는 DAG 업로드 직후에 워크플로가 시작되도록 예약합니다.

with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Cloud Storage에 DAG 업로드

Cloud Composer는 DAG 폴더의 DAG만 예약합니다. DAG 폴더는 Cloud Composer가 환경에 맞게 자동으로 만드는 Cloud Storage 버킷에 위치합니다.

DAG를 업로드하려면 다음 안내를 따르세요.

  1. 로컬 머신에서 hadoop_tutorial.py를 저장합니다.
  2. Cloud Console에서 환경 페이지로 이동합니다.

    환경 페이지 열기

  3. example-environmentDAGs 폴더 열에서 DAGs 링크를 클릭합니다. Cloud Storage의 DAG 폴더가 열립니다.

  4. 파일 업로드를 클릭합니다.

  5. 로컬 머신에서 hadoop_tutorial.py를 선택하고 열기를 클릭합니다.

Cloud Composer는 DAG를 Airflow에 추가하고 DAG를 자동으로 예약합니다. DAG 변경사항은 3~5분 이내에 적용됩니다.

DAG 실행 탐색

태스크 상태 보기

DAG 파일을 Cloud Storage의 dags/ 폴더에 업로드하면 Cloud Composer가 파일을 파싱합니다. 작업이 성공적으로 완료되면 워크플로 이름이 DAG 목록에 나타나고 워크플로가 즉시 실행되도록 큐에 추가됩니다.

  1. 태스크 상태를 보려면 Airflow 웹 인터페이스로 이동하고 툴바에서 DAGs를 클릭합니다.

  2. DAG 세부정보 페이지를 열려면 composer_hadoop_tutorial을 클릭합니다. 이 페이지에서는 워크플로 태스크와 종속 항목이 그래픽으로 표시됩니다.

  3. 각 태스크의 상태를 확인하려면 그래프 뷰를 클릭한 후 각 태스크의 그래픽에 마우스를 가져갑니다.

워크플로를 다시 큐에 추가

그래프 뷰에서 워크플로를 다시 실행하려면 다음 안내를 따르세요.

  1. Airflow UI 그래프 뷰에서 create_dataproc_cluster 그래픽을 클릭합니다.
  2. 태스크 세 개를 재설정하려면 삭제를 클릭한 후 확인을 클릭하여 확인합니다.
  3. 그래프 뷰에서 create_dataproc_cluster를 다시 클릭합니다.
  4. 워크플로를 다시 큐에 추가하려면 실행을 클릭합니다.

태스크 결과 보기

다음 Cloud Console 페이지로 이동하여 composer_hadoop_tutorial 워크플로의 상태와 결과를 확인할 수도 있습니다.

  • 클러스터 생성 및 삭제를 모니터링할 Dataproc 클러스터. 워크플로에서 생성된 클러스터는 수명이 짧습니다. 즉, 워크플로 지속시간 동안만 존재하며 마지막 워크플로 태스크의 일부로 삭제됩니다.

  • Apache Hadoop 워드카운트 작업을 보거나 모니터링하기 위한 Dataproc 작업. 작업 ID를 클릭하여 작업 로그 출력을 확인합니다.

  • 이 가이드에서 만든 Cloud Storage 버킷에 있는 wordcount 폴더의 워드카운트 결과를 확인하기 위한 Cloud Storage 브라우저

삭제

이 가이드에서 사용한 리소스에 대한 비용이 Google Cloud Platform 계정에 청구되지 않도록 다음을 따릅니다.

  1. Cloud Console에서 리소스 관리 페이지로 이동합니다.

    리소스 관리 페이지로 이동

  2. 삭제할 프로젝트가 조직에 연결되어 있으면 페이지 상단의 조직 목록에서 조직을 선택합니다.
  3. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제 를 클릭합니다.
  4. 대화상자에서 프로젝트 ID를 입력한 다음 종료를 클릭하여 프로젝트를 삭제합니다.

또는 이 가이드에서 사용된 리소스를 삭제할 수 있습니다.

  1. Cloud Composer 환경을 삭제합니다.
  2. Cloud Composer 환경의 Cloud Storage 버킷을 삭제합니다. Cloud Composer 환경을 삭제하면 버킷이 삭제되지 않습니다.
  3. Cloud Composer의 Pub/Sub 주제를 삭제합니다(composer-agentcomposer-backend)).

다음 단계