Cloud Composer를 사용하는 워크플로

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Dataproc
  • Compute Engine
  • Cloud Composer

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

시작하기 전에

프로젝트 설정

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Dataproc 워크플로 템플릿 만들기

로컬 터미널 창 또는 Cloud Shell에서 아래 나열된 명령어를 복사하고 실행하여 워크플로 템플릿을 만들고 정의합니다.

  1. sparkpi 워크플로 템플릿 만들기
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. sparkpi 워크플로 템플릿에 spark 작업을 추가합니다. 'compute' step-id 플래그는 SparkPi 작업을 식별합니다.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. 관리형, 단일 노드 클러스터를 사용하여 워크플로를 실행합니다. Dataproc이 클러스터를 만들고 워크플로를 실행한 다음 워크플로가 완료되면 클러스터를 삭제합니다.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. 워크플로 템플릿 만들기를 확인합니다.

    콘솔

    Google Cloud 콘솔의 Dataproc 워크플로 페이지에서 sparkpi 이름을 클릭하여 워크플로 템플릿 세부정보 페이지를 엽니다. 워크플로 템플릿의 이름을 클릭하여 sparkpi 템플릿 속성을 확인합니다.

    gcloud 명령어

    다음 명령어를 실행합니다.

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

DAG 생성 및 Cloud Storage로 업로드

  1. 기존 Cloud Composer 환경을 사용하거나 새로 만듭니다.
  2. 환경 변수를 설정합니다.

    Airflow UI

    1. 툴바에서 관리 > 변수를 클릭합니다.
    2. 만들기를 클릭합니다.
    3. 다음 정보를 입력합니다.
      • 키: project_id
      • 값: PROJECT_ID — Google Cloud 프로젝트 ID
    4. 저장을 클릭합니다.

    gcloud 명령어

    다음 명령어를 입력합니다.

    • ENVIRONMENT는 Cloud Composer 환경의 이름입니다.
    • LOCATION은 Cloud Composer 환경이 위치한 리전입니다.
    • PROJECT_ID는 Cloud Composer 환경이 포함된 프로젝트의 프로젝트 ID입니다.
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. 다음 DAG 코드를 DataprocInstantiateWorkflowTemplateOperator를 사용하는 'composer-dataproc-dag.py'라는 파일에 로컬로 복사합니다.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Cloud Storage의 환경 폴더에 DAG를 업로드합니다. 업로드가 완료되면 Cloud Composer 환경 페이지에서 DAG 폴더 링크를 클릭합니다.

작업 상태 보기

Airflow UI

  1. Airflow 웹 인터페이스를 엽니다.
  2. DAG 페이지에서 DAG 이름(예: dataproc_workflow_dag)을 클릭합니다.
  3. DAG 세부정보 페이지에서 그래프 보기를 클릭합니다.
  4. 상태를 확인합니다.
    • 실패: 작업 주변에 빨간색 상자가 있습니다. 작업 위로 마우스 포인터를 올려놓고 상태: 실패를 찾을 수도 있습니다. 작업 주변에 빨간색 상자가 표시되며 실패했음을 나타냅니다.
    • 성공: 작업 주변에 녹색 상자가 있습니다. 작업 위로 마우스 포인터를 올려놓고 상태: 성공을 확인할 수도 있습니다. 작업 주변에 녹색 상자가 표시되며 성공했음을 나타냅니다.

콘솔

워크플로 탭을 클릭하여 워크플로 상태를 확인합니다.

gcloud 명령어

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

삭제

Google Cloud 계정에 요금이 부과되지 않도록 이 튜토리얼에서 사용한 리소스를 삭제할 수 있습니다.

  1. Cloud Composer 환경을 삭제합니다.

  2. 워크플로 템플릿을 삭제합니다.

다음 단계