Cloud Composer로 인프라 자동화

이 가이드에서는 Cloud Composer를 사용하여 클라우드 인프라를 자동화하는 방법을 보여줍니다. 이를 위해 이 가이드에서는 Compute Engine VM(가상 머신) 인스턴스의 자동 백업을 예약하는 방법을 예로 듭니다.

Cloud Composer는 Google Cloud의 완전 관리형 워크플로 조정 서비스입니다. Cloud Composer는 Python API를 사용하여 워크플로를 빌드하고, 워크플로를 자동으로 실행되도록 예약하거나 수동으로 시작하고, 그래픽 UI를 통해 작업 실행 현황을 실시간으로 모니터링할 수 있게 해줍니다.

Cloud Composer는 Apache Airflow를 기반으로 합니다. Google은 Google Kubernetes Engine(GKE) 클러스터를 기반으로 이 오픈소스 조정 플랫폼을 실행합니다. 이 클러스터는 Airflow 작업자를 관리하고 다른 Google Cloud 제품들과의 통합 기회를 제공합니다.

이 가이드는 인프라를 자동화하고 Cloud Composer의 핵심 기능을 기술적으로 탐구하는 데 관심이 있는 운영자, IT 관리자, 개발자를 대상으로 합니다. 이 가이드는 엔터프라이즈급 DR(재해 복구) 가이드가 아니며 백업 권장사항 가이드도 아닙니다. 엔터프라이즈의 DR 계획을 만드는 방법에 대한 자세한 내용은 재해 복구 계획 가이드를 참조하세요.

아키텍처 정의

Cloud Composer 워크플로는 DAG(Directed Acyclic Graph)를 만들어 정의합니다. Airflow 관점에서 볼 때 DAG는 방향성 상호 종속 항목을 반영하도록 구성된 작업의 모음입니다. 이 가이드에서는 Persistent Disk 스냅샷을 사용하여 Compute Engine 가상 머신 인스턴스를 백업하기 위해 정기적으로 실행되는 Airflow 워크플로를 정의하는 방법을 알아봅니다.

이 예시에 사용된 Compute Engine VM은 부팅 영구 디스크가 연결된 인스턴스로 구성됩니다. Cloud Composer 백업 워크플로는 스냅샷 지침(자세한 설명은 뒷부분 참조)에 따라 Compute Engine API를 호출하여 인스턴스를 중지하고 영구 디스크의 스냅샷을 만든 다음 인스턴스를 다시 시작합니다. 이 워크플로는 하나의 작업이 완료되어야 다음 작업을 진행합니다.

다음 다이어그램은 아키텍처를 요약한 것입니다.

인프라 자동화용 아키텍처

다음 섹션에서는 가이드를 시작하기 전에 Cloud Composer 환경을 만드는 방법을 보여줍니다. 이 환경의 장점은 여러 Google Cloud 제품이 사용되지만 각 항목을 개별적으로 구성할 필요가 없다는 것입니다.

  • Cloud Storage: Airflow DAG, 플러그인, 로그가 Cloud Storage 버킷에 저장됩니다.
  • Google Kubernetes Engine: Airflow 플랫폼은 마이크로 서비스 아키텍처에 기반을 두고 있으며, GKE에서 실행하기에 적합합니다.
    • Airflow 작업자는 Cloud Storage에서 플러그인 및 워크플로 정의를 로드하고 Compute Engine API를 사용하여 각 작업을 실행합니다.
    • Airflow 스케줄러는 구성된 주기와 적절한 작업 순서에 따라 백업이 실행되도록 합니다.
    • Redis는 Airflow 구성요소 간의 메시지 브로커로 사용됩니다.
    • Cloud SQL Proxy는 메타데이터 저장소와의 통신에 사용됩니다.
  • Cloud SQL 및 App Engine Flex: Cloud Composer는 메타데이터용 Cloud SQL 인스턴스와 Airflow UI를 제공하는 App Engine Flex 앱도 사용합니다. 이러한 리소스는 별도의 Google 관리 프로젝트에 있으므로 다이어그램에 도식화되지 않았습니다.

자세한 내용은 Cloud Composer 개요를 참조하세요.

워크플로 확장

이 가이드는 정해진 일정에 따라 단일 가상 머신의 스냅샷을 생성하는 간단한 사용 사례를 기반으로 합니다. 그러나 실제 상황을 기반으로 한 시나리오에는 조직의 여러 부분에 속하는 수백 개의 VM 또는 각기 다른 백업 일정이 필요한 시스템의 여러 계층이 포함될 수 있습니다. 확장은 Compute Engine VM 관련 예시뿐만 아니라 예약된 프로세스를 실행해야 하는 모든 인프라 구성요소에 적용됩니다.

Cloud Composer는 Cloud Scheduler 또는 cron의 대안에 불과한 것이 아니라 클라우드에서 호스팅되는 Apache Airflow 기반의 완전한 기능을 갖춘 워크플로 엔진이므로 이런 복잡한 사례에서 진가를 발휘합니다.

워크플로의 유연한 표현인 Airflow DAG는 단일 코드베이스에서 계속 실행되면서 실제 요구 사항에 맞게 조정됩니다. 사용 사례에 적합한 DAG를 빌드하려면 다음 두 가지 방법을 함께 사용하면 됩니다.

  • 같은 일정으로 프로세스를 시작할 수 있는 인프라 구성요소 그룹용으로 DAG 인스턴스를 하나 만듭니다.
  • 개별 일정이 필요한 인프라 구성요소 그룹에는 개별 DAG 인스턴스를 만듭니다.

DAG는 구성요소를 동시에 처리할 수 있습니다. 작업은 각 구성요소의 비동기 작업을 시작하거나 각 구성요소를 처리하는 브랜치를 만들어야 합니다. 코드에서 DAG를 동적으로 빌드하여 필요에 따라 브랜치 및 작업을 추가하거나 삭제할 수 있습니다.

또한 동일한 DAG 내 애플리케이션 계층 간에 종속 항목을 모델링할 수 있습니다. 예를 들어 앱 서버 인스턴스를 중지하기 전에 모든 웹 서버 인스턴스를 중지할 수 있습니다.

이러한 최적화는 본 가이드에서 다루지 않습니다.

스냅샷 관련 권장사항 적용

Persistent Disk는 가상 머신 인스턴스에 연결할 수 있고, 인스턴스의 기본 부팅 디스크 또는 중요 데이터의 보조 비부팅 디스크로 사용될 수 있는 지속형 블록 저장소입니다. PD는 가용성이 뛰어납니다. 쓰기 한 건당 세 개의 복제본이 기록되지만 Google Cloud 고객에게는 한 건에 대한 요금만 청구됩니다.

스냅샷은 특정 시점의 영구 디스크에 관한 정확한 복사본입니다. 스냅샷은 증분되고 압축되며 Cloud Storage에 투명하게 저장됩니다.

앱이 실행되는 동안 영구 디스크의 스냅샷을 생성할 수도 있습니다. 스냅샷에는 부분적으로 작성된 블록이 포함되지 않습니다. 그러나 백엔드가 스냅샷 생성 요청을 수신할 때 여러 블록에 걸친 쓰기 작업이 진행 중인 경우 해당 스냅샷에는 업데이트된 블록 중 일부만 포함될 수 있습니다. 이러한 불일치는 비정상적인 종료를 처리하는 것과 같은 방법으로 처리할 수 있습니다.

스냅샷의 일관성을 유지하려면 다음 지침을 따르는 것이 좋습니다.

  • 스냅샷 생성 프로세스 중에 디스크 쓰기를 최소화하거나 삼갑니다. 사용량이 많지 않은 시간에 백업을 예약하는 것이 좋습니다.
  • 보조 비부팅 디스크의 경우 데이터를 쓰고 파일 시스템을 고정하거나 마운트 해제하는 앱과 프로세스를 일시중지합니다.
  • 부팅 디스크의 루트 볼륨을 고정하는 것은 안전하지 않거나 현실적으로 불가능합니다. 스냅샷을 생성하기 전에 가상 머신 인스턴스를 중지하는 것이 적합한 방법이 될 수 있습니다.

    가상 머신 고정 또는 중지로 인해 발생하는 서비스 다운타임을 피하려면 고가용성 아키텍처를 사용하는 것이 좋습니다. 자세한 내용은 애플리케이션 재해 복구 시나리오를 참조하세요.

  • 스냅샷에 일관된 이름 지정 규칙을 사용합니다. 예를 들어 인스턴스, 디스크, 영역의 이름에 타임스탬프를 필요한 만큼 상세한 단위로 사용하세요.

일관된 스냅샷 생성에 대한 자세한 내용은 스냅샷 권장사항을 참조하세요.

목표

  • 커스텀 Airflow 연산자 및 Compute Engine용 센서 만들기
  • Airflow 연산자와 센서를 사용하여 Cloud Composer 워크플로 만들기
  • 일정한 간격으로 Compute Engine 인스턴스를 백업하도록 워크플로 예약

비용

가격 계산기를 사용하여 예상 사용량을 기준으로 예상 비용을 산출할 수 있습니다.

시작하기 전에

  1. Google 계정으로 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기 페이지로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Cloud Composer 환경을 만듭니다. 비용을 최소화하려면 디스크 크기로 20GB를 선택합니다.

    환경 만들기 페이지로 이동

    Cloud Composer 환경을 프로비저닝하는 데는 일반적으로 약 15분이 걸리지만 최대 1시간이 소요될 수 있습니다.

  5. 이 가이드의 전체 코드는 GitHub에서 확인할 수 있습니다. 파일을 살펴보면서 진행하려면 Cloud Shell에서 저장소를 여세요.

    Cloud Shell로 이동
  6. Cloud Shell 콘솔 홈 디렉터리에서 다음 명령어를 실행합니다.
    git clone https://github.com/GoogleCloudPlatform/composer-infra-python.git

이 가이드를 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않게 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

샘플 Compute Engine 인스턴스 설정

첫 번째 단계는 백업할 샘플 Compute Engine 가상 머신 인스턴스를 만드는 것입니다. 이 인스턴스는 오픈소스 콘텐츠 관리 시스템인 WordPress를 실행합니다.

Compute Engine에서 WordPress 인스턴스를 만들려면 다음 단계를 따르세요.

  1. Google Cloud Marketplace에서 WordPress Certified by Bitnami 시작 페이지로 이동합니다.
  2. Launch(시작)를 클릭합니다.
  3. 프로젝트 목록이 있는 팝업 창이 나타납니다. 이 가이드에서 이전에 만든 프로젝트를 선택합니다.

    Google Cloud는 프로젝트에 필요한 API를 구성하고, 잠시 기다린 후 WordPress Compute Engine 인스턴스에 대해 다른 구성 옵션이 포함된 화면을 표시합니다.

  4. 필요할 경우 부팅 디스크 유형을 SSD로 변경하여 인스턴스 부팅 속도를 높입니다.

  5. 배포를 클릭합니다.

    배포 상태를 볼 수 있는 Deployment Manager 화면으로 이동합니다.

    WordPress Deployment Manager 스크립트가 WordPress Compute Engine 인스턴스를 만들고, TCP 트래픽이 포트 80 및 443을 통해 이 인스턴스에 도달하도록 허용하는 방화벽 규칙 두 개도 만듭니다. 이 프로세스는 완료하는 데 몇 분 정도 걸릴 수 있으며, 각 항목이 배포되고 진행률 휠 아이콘이 표시됩니다.

    프로세스가 완료되면 WordPress 인스턴스가 준비되고 웹사이트 URL의 기본 콘텐츠가 제공됩니다. Deployment Manager 화면에는 웹사이트 URL(사이트 주소), 관리 콘솔 URL(관리 URL)과 사용자 및 비밀번호, 문서 링크, 추천하는 다음 단계가 표시됩니다.

    배포된 인스턴스를 보여주는 Deployment Manager

  6. 사이트 주소를 클릭하여 WordPress 인스턴스가 실행 중인지 확인합니다. 기본 WordPress 블로그 페이지가 표시되어야 합니다.

이제 샘플 Compute Engine 인스턴스가 준비되었습니다. 다음 단계는 해당 인스턴스의 영구 디스크에 대한 자동 증분 백업 프로세스를 구성하는 것입니다.

커스텀 Airflow 연산자 만들기

테스트 인스턴스의 영구 디스크를 백업하려면 인스턴스를 중지하고 영구 디스크의 스냅샷을 생성한 다음 인스턴스를 다시 시작하는 Airflow 워크플로를 만들면 됩니다. 이러한 각 작업은 커스텀 Airflow 연산자를 사용한 코드로 정의됩니다. 그런 다음 연산자 코드가 Airflow 플러그인에 그룹화됩니다.

이 섹션에서는 Compute Engine Python 클라이언트 라이브러리를 호출하여 인스턴스 수명 주기를 제어하는 커스텀 Airflow 연산자를 빌드하는 방법을 알아봅니다. 이 작업을 수행하는 다른 옵션이 있습니다. 예를 들면 다음과 같습니다.

  • Airflow BashOperator를 사용하여 gcloud compute 명령어를 실행합니다.
  • Airflow HTTPOperator를 사용하여 Compute Engine REST API에 직접 HTTP 호출을 실행합니다.
  • 커스텀 연산자를 정의할 필요 없이 Airflow PythonOperator를 사용하여 임의의 Python 함수를 호출합니다.

이 가이드에서는 이러한 옵션을 살펴보지 않습니다.

Compute Engine API에 대한 호출 승인

이 가이드에서 만드는 커스텀 연산자는 Python 클라이언트 라이브러리를 사용하여 Compute Engine API를 호출합니다. API 요청을 인증하고 승인해야 합니다. 권장 방법은 애플리케이션 기본 사용자 인증 정보(ADC)라는 전략을 사용하는 것입니다.

ADC 전략은 클라이언트 라이브러리에서 호출이 실행될 때마다 적용됩니다.

  1. 라이브러리가 GOOGLE_APPLICATION_CREDENTIALS 환경 변수에 서비스 계정이 지정되어 있는지 확인합니다.
  2. 서비스 계정이 지정되어 있지 않으면 라이브러리가 Compute Engine 또는 GKE에서 제공하는 기본 서비스 계정을 사용합니다.

이 두 방법이 실패하면 오류가 발생합니다.

이 가이드의 Airflow 연산자는 두 번째 방법에 해당합니다. Cloud Composer 환경을 만들면 GKE 클러스터가 프로비저닝됩니다. 이 클러스터의 노드가 Airflow 작업자 포드를 실행합니다. 그런 다음 이 작업자는 개발자가 정의한 커스텀 연산자로 워크플로를 실행합니다. 여기서는 환경을 만들 때 서비스 계정을 지정하지 않았으므로 ADC 전략에서 GKE 클러스터 노드의 기본 서비스 계정이 사용됩니다.

GKE 클러스터 노드는 Compute Engine 인스턴스입니다. 따라서 연산자 코드에서 Compute Engine 기본 서비스 계정과 연결된 사용자 인증 정보를 간편하게 가져올 수 있습니다.

def get_compute_api_client(self):
  credentials = GoogleCredentials.get_application_default()
  return googleapiclient.discovery.build(
      'compute', 'v1', cache_discovery=False, credentials=credentials)

이 코드는 애플리케이션 기본 사용자 인증 정보를 사용하여 Compute Engine API에 요청을 보낼 Python 클라이언트를 만듭니다. 다음 섹션에서는 각 Airflow 연산자를 만들 때 이 코드를 참조합니다.

기본 Compute Engine 서비스 계정을 사용하는 대신 서비스 계정을 만들고 이를 Airflow 관리 콘솔에서 연결된 서비스 계정으로 구성할 수 있습니다. 이 방법은 Airflow 연결 관리 페이지에 설명되어 있으며 Google Cloud 리소스 액세스를 더 세부적으로 제어할 수 있습니다. 이 가이드에서는 이러한 대안을 살펴보지 않습니다.

Compute Engine 인스턴스를 안전하게 종료

이 섹션에서는 첫 번째 커스텀 Airflow 연산자인 StopInstanceOperator의 생성을 자세히 다룹니다. 이 연산자는 Compute Engine API를 호출하여 WordPress를 실행 중인 Compute Engine 인스턴스를 중지합니다.

  1. Cloud Shell에서 nano 또는 vim과 같은 텍스트 편집기를 사용하여 gce_commands_plugin.py 파일을 엽니다.

    vi $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py
    
  2. 파일 상단에서 import를 살펴봅니다.

    import datetime
    import logging
    import time
    from airflow.models import BaseOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.decorators import apply_defaults
    import googleapiclient.discovery
    from oauth2client.client import GoogleCredentials

    주목할 만한 import는 다음과 같습니다.

    • BaseOperator: 모든 Airflow 커스텀 연산자가 상속해야 하는 기본 클래스입니다.
    • AirflowPlugin: 연산자 그룹을 만들어 플러그인을 형성하는 기본 클래스입니다.
    • apply_defaults: 연산자 생성자에 지정되지 않은 경우 인수를 기본값으로 채우는 함수 데코레이터입니다.
    • GoogleCredentials: 앱 기본 사용자 인증 정보를 검색하는 데 사용되는 클래스입니다.
    • googleapiclient.discovery: 기본 Google API 검색을 허용하는 클라이언트 라이브러리 진입점입니다. 이 경우 클라이언트 라이브러리는 Compute Engine API와 상호작용하도록 리소스를 빌드합니다.
  3. 이제 import 아래의 StopInstanceOperator 클래스를 확인합니다.

    class StopInstanceOperator(BaseOperator):
      """Stops the virtual machine instance."""
    
      @apply_defaults
      def __init__(self, project, zone, instance, *args, **kwargs):
        self.compute = self.get_compute_api_client()
        self.project = project
        self.zone = zone
        self.instance = instance
        super(StopInstanceOperator, self).__init__(*args, **kwargs)
    
      def get_compute_api_client(self):
        credentials = GoogleCredentials.get_application_default()
        return googleapiclient.discovery.build(
            'compute', 'v1', cache_discovery=False, credentials=credentials)
    
      def execute(self, context):
        logging.info('Stopping instance %s in project %s and zone %s',
                     self.instance, self.project, self.zone)
        self.compute.instances().stop(
            project=self.project, zone=self.zone, instance=self.instance).execute()
        time.sleep(90)

    StopInstanceOperator 클래스에는 3개의 메서드가 포함됩니다.

    • __init__: 클래스 생성자 프로젝트 이름, 인스턴스가 실행 중인 영역, 중지할 인스턴스의 이름을 받습니다. 또한 get_compute_api_client를 호출하여 self.compute 변수를 초기화합니다.
    • get_compute_api_client: Compute Engine API의 인스턴스를 반환하는 도우미 메서드입니다. GoogleCredentials에서 제공된 ADC를 사용하여 API에 인증을 수행하고 후속 호출을 승인합니다.
    • execute: BaseOperator에서 재정의된 기본 연산자 메서드입니다. Airflow는 이 메서드를 호출하여 연산자를 실행합니다. 이 메서드는 정보 메시지를 로그에 출력한 다음 Compute Engine API를 호출하여 생성자에 수신된 세 가지 매개변수가 나타내는 Compute Engine 인스턴스를 중지합니다. 끝에 있는 sleep() 함수는 인스턴스가 중지될 때까지 대기합니다. 프로덕션 환경에서는 연산자 교차 통신과 같은 보다 확정적인 방법을 사용해야 합니다. 이 기법은 이 가이드의 뒷부분에서 설명합니다.

Compute Engine API의 stop() 메서드는 가상 머신 인스턴스를 완전히 종료합니다. 운영체제는 WordPress용 종료 스크립트(/etc/init.d/bitnami)를 포함한 init.d 종료 스크립트를 실행합니다. 이 스크립트는 가상 머신이 다시 시작될 때 WordPress 시작도 처리합니다. /etc/systemd/system/bitnami.service.에서 종료 및 시작 구성을 포함한 서비스 정의를 살펴볼 수 있습니다.

고유하게 명명된 증분 백업 스냅샷 만들기

이 섹션에서는 두 번째 커스텀 연산자인 SnapshotDiskOperator를 만듭니다. 이 연산자는 인스턴스의 영구 디스크 스냅샷을 만듭니다.

이전 섹션에서 연 gce_commands_plugin.py 파일에서 SnapshotDiskOperator 클래스를 확인합니다.

class SnapshotDiskOperator(BaseOperator):
  """Takes a snapshot of a persistent disk."""

  @apply_defaults
  def __init__(self, project, zone, instance, disk, *args, **kwargs):
    self.compute = self.get_compute_api_client()
    self.project = project
    self.zone = zone
    self.instance = instance
    self.disk = disk
    super(SnapshotDiskOperator, self).__init__(*args, **kwargs)

  def get_compute_api_client(self):
    credentials = GoogleCredentials.get_application_default()
    return googleapiclient.discovery.build(
        'compute', 'v1', cache_discovery=False, credentials=credentials)

  def generate_snapshot_name(self, instance):
    # Snapshot name must match regex '(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)'
    return ('' + self.instance + '-' +
            datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S'))

  def execute(self, context):
    snapshot_name = self.generate_snapshot_name(self.instance)
    logging.info(
        ("Creating snapshot '%s' from: {disk=%s, instance=%s, project=%s, "
         "zone=%s}"),
        snapshot_name, self.disk, self.instance, self.project, self.zone)
    self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    time.sleep(120)

SnapshotDiskOperator 클래스에는 다음 메서드가 있습니다.

  • __init__: 클래스 생성자 StopInstanceOperator 클래스의 생성자와 비슷하지만, 프로젝트, 영역, 인스턴스 이름 외에도 이 생성자는 스냅샷을 생성할 디스크 이름을 수신합니다. 인스턴스에 둘 이상의 영구 디스크를 연결할 수 있기 때문입니다.
  • generate_snapshot_name: 이 샘플 메서드는 인스턴스 이름, 날짜, 1초 단위 시간을 사용해서 각 스냅샷에 대해 간단하고 고유한 이름을 만듭니다. 이름은 필요에 맞게 조정하세요. 예를 들어 인스턴스 하나에 여러 디스크가 연결될 경우 디스크 이름을 추가할 수 있고, 임시 스냅샷 생성 요청을 지원하려는 경우 시간 단위를 늘릴 수 있습니다.
  • execute: BaseOperator에서 재정의된 주 연산자 메서드입니다. Airflow 작업자는 이를 실행할 때 generate_snapshot_name 메서드를 사용하여 스냅샷 이름을 생성합니다. 그런 다음 정보 메시지를 출력하고 Compute Engine API를 호출하여 생성자에 수신된 매개변수를 사용하여 스냅샷을 만듭니다.

Compute Engine 인스턴스 시작

이 섹션에서는 세 번째이자 최종 커스텀 연산자인 StartInstanceOperator를 만듭니다. 이 연산자는 Compute Engine 인스턴스를 다시 시작합니다.

이전에 연 gce_commands_plugin.py 파일에서 파일 하단의 SnapshotDiskOperator 클래스를 확인합니다.

class StartInstanceOperator(BaseOperator):
  """Starts a virtual machine instance."""

  @apply_defaults
  def __init__(self, project, zone, instance, *args, **kwargs):
    self.compute = self.get_compute_api_client()
    self.project = project
    self.zone = zone
    self.instance = instance
    super(StartInstanceOperator, self).__init__(*args, **kwargs)

  def get_compute_api_client(self):
    credentials = GoogleCredentials.get_application_default()
    return googleapiclient.discovery.build(
        'compute', 'v1', cache_discovery=False, credentials=credentials)

  def execute(self, context):
    logging.info('Starting instance %s in project %s and zone %s',
                 self.instance, self.project, self.zone)
    self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(20)

StartInstanceOperator 클래스에는 다음 메서드가 있습니다.

  • __init__: 클래스 생성자 StopInstanceOperator 클래스의 생성자와 비슷합니다.
  • execute: BaseOperator에서 재정의된 주 연산자 메서드입니다. 이전 연산자와의 차이점은 해당 Compute Engine API를 호출하여 생성자 입력 매개변수에 지정된 인스턴스를 시작하는 것입니다.

Airflow 워크플로 정의

앞서 세 연산자가 포함된 Airflow 플러그인을 정의했습니다. 이러한 연산자는 Airflow 워크플로의 일부를 구성하는 작업을 정의합니다. 여기에 제시된 워크플로는 단순하고 선형이지만 Airflow 워크플로는 복잡한 Directed Acyclic Graph가 될 수 있습니다.

이 섹션에서는 세 연산자를 노출한 후 이러한 연산자를 사용하여 DAG를 만들어 Cloud Composer에 배포하고 DAG를 실행하는 플러그인 클래스를 만듭니다.

플러그인 만들기

현재 gce_commands_plugin.py 파일에는 시작, 스냅샷, 중지 연산자가 포함되어 있습니다. 워크플로에서 이러한 연산자를 사용하려면 플러그인 클래스에 포함해야 합니다.

  1. gce_commands_plugin.py 파일 하단의 GoogleComputeEnginePlugin 클래스를 확인합니다.

    class GoogleComputeEnginePlugin(AirflowPlugin):
      """Expose Airflow operators."""
    
      name = 'gce_commands_plugin'
      operators = [StopInstanceOperator, SnapshotDiskOperator,
                   StartInstanceOperator]

    AirflowPlugin에서 상속된 이 클래스는 플러그인에 내부 이름 gce_commands_plugin을 지정하고 여기에 세 개의 연산자를 추가합니다.

  2. gce_commands_plugin.py 파일을 닫습니다.

Directed Acyclic Graph 구성

DAG는 Airflow가 실행하는 워크플로를 정의합니다. DAG가 백업할 디스크를 알 수 있도록 몇 가지 변수를 정의해야 합니다. 즉, 디스크가 연결된 Compute Engine 인스턴스, 인스턴스가 실행 중인 영역, 모든 리소스를 사용할 수 있는 프로젝트를 정의해야 합니다.

DAG 소스 코드 자체에서 이러한 변수를 하드 코딩할 수 있지만 이러한 변수를 Airflow 변수로 정의하는 것이 가장 좋습니다. 이렇게 하면 모든 구성 변경사항을 중앙 집중식으로, 코드 배포와 독립적으로 관리할 수 있습니다.

DAG 구성을 정의합니다.

  1. Cloud Shell에서 Cloud Composer 환경의 위치를 설정합니다.

    LOCATION=[CLOUD_ENV_LOCATION]
    

    위치는 Cloud Composer 환경이 있는 Compute Engine 리전입니다(예: us-central1 또는 europe-west1). 환경 생성 당시 설정된 것이며 Cloud Composer 콘솔 페이지에서 볼 수 있습니다.

  2. Cloud Composer 환경 이름을 설정합니다.

    ENVIRONMENT=$(gcloud composer environments list \
        --format="value(name)" --locations $LOCATION)
    

    --format 매개변수를 사용하여 결과 테이블에서 name 열만 선택합니다. 하나의 환경만 생성되었다고 가정할 수 있습니다.

  3. 현재 Google Cloud 프로젝트의 이름을 사용하여 Airflow에서 PROJECT 변수를 만듭니다.

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set PROJECT $(gcloud config get-value project)
    

    각 항목의 의미는 다음과 같습니다.

    • gcloud composer environments run은 Airflow CLI 명령어를 실행하기 위해 사용됩니다.
    • variables Airflow 명령어는 PROJECT Airflow 변수를 gcloud config에서 반환된 값으로 설정합니다.
  4. WordPress 인스턴스의 이름을 사용하여 Airflow에 INSTANCE 변수를 만듭니다.

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set INSTANCE \
        $(gcloud compute instances list \
        --format="value(name)" --filter="name~'.*wordpress.*'")
    

    이 명령어는 --filter 매개변수를 사용하여 namewordpress 문자열을 포함하는 정규 표현식과 일치하는 인스턴스만 선택합니다. 이 접근법은 이러한 인스턴스가 하나만 있다고 가정하고 인스턴스와 디스크의 이름에 'wordpress'가 포함되어 있다고 가정합니다(기본값을 그대로 사용한 경우에 해당).

  5. WordPress 인스턴스 영역을 사용하여 Airflow에 ZONE 변수를 만듭니다.

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set ZONE \
        $(gcloud compute instances list \
        --format="value(zone)" --filter="name~'.*wordpress.*'")
    
  6. WordPress 인스턴스에 연결된 영구 디스크의 이름을 사용하여 Airflow에 DISK 변수를 만듭니다.

    gcloud composer environments run $ENVIRONMENT --location $LOCATION \
        variables -- --set DISK \
        $(gcloud compute disks list \
        --format="value(name)" --filter="name~'.*wordpress.*'")
    
  7. Airflow 변수가 올바르게 생성되었는지 확인합니다.

    1. Cloud Console에서 Cloud Composer 페이지로 이동합니다.

      Cloud Composer 페이지로 이동

    2. Airflow 웹 서버 열에서 Airflow 링크를 클릭합니다. Airflow 웹 서버 기본 페이지를 보여주는 새로운 탭이 열립니다.

    3. 관리를 클릭한 다음 변수를 클릭합니다.

      DAG 구성 변수 목록이 표시됩니다.

      DAG 구성 변수

Directed Acyclic Graph 만들기

DAG 정의는 전용 Python 파일에 있습니다. 다음 단계는 플러그인에서 세 연산자를 연결하여 DAG를 만드는 것입니다.

  1. Cloud Shell에서 nano 또는 vim과 같은 텍스트 편집기를 사용하여 backup_vm_instance.py 파일을 엽니다.

    vi $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py
    
  2. 파일 상단에서 import를 살펴봅니다.

    import datetime
    from airflow import DAG
    from airflow.models import Variable
    from airflow.operators import SnapshotDiskOperator
    from airflow.operators import StartInstanceOperator
    from airflow.operators import StopInstanceOperator
    from airflow.operators.dummy_operator import DummyOperator

    이러한 import를 요약하면 다음과 같습니다.

    • DAG는 Airflow에서 정의한 Directed Acyclic Graph 클래스입니다.
    • DummyOperator는 워크플로 시각화 향상을 위해 시작 및 종료 작업 없음 연산자를 만드는 데 사용됩니다. 더 복잡한 DAG에서는 DummyOperator를 사용하여 브랜치를 조인하고 SubDAG를 만듭니다.
    • DAG는 이전 섹션에서 정의된 세 가지 연산자를 사용합니다.
  3. 연산자 생성자에 전달할 매개변수의 값을 정의합니다.

    INTERVAL = '@daily'
    START_DATE = datetime.datetime(2018, 7, 16)
    PROJECT = Variable.get('PROJECT')
    ZONE = Variable.get('ZONE')
    INSTANCE = Variable.get('INSTANCE')
    DISK = Variable.get('DISK')

    각 항목의 의미는 다음과 같습니다.

    • INTERVAL은 백업 워크플로 실행 빈도를 정의합니다. 앞의 코드는 Airflow 크론 사전 설정을 사용하여 일일 반복을 지정합니다. 다른 간격을 사용하려면 DAG 실행 참조 페이지를 참조하세요. 이 일정과 상관없이 워크플로를 수동으로 트리거할 수도 있습니다.
    • START_DATE는 백업이 시작되도록 예약된 특정 시점을 정의합니다. 이 값을 변경할 필요는 없습니다.
    • 나머지 값은 이전 섹션에서 구성한 Airflow 변수에서 검색됩니다.
  4. 다음 코드를 사용하여 이전에 정의한 매개변수 중 일부로 DAG를 만듭니다. 또한 이 코드는 Cloud Composer UI에 표시되는 이름과 설명을 DAG에 제공합니다.

    dag1 = DAG('backup_vm_instance',
               description='Backup a Compute Engine instance using an Airflow DAG',
               schedule_interval=INTERVAL,
               start_date=START_DATE,
               catchup=False)
  5. 연산자 인스턴스에 해당하는 작업으로 DAG를 채웁니다.

    ## Dummy tasks
    begin = DummyOperator(task_id='begin', retries=1, dag=dag1)
    end = DummyOperator(task_id='end', retries=1)
    
    ## Compute Engine tasks
    stop_instance = StopInstanceOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
    snapshot_disk = SnapshotDiskOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        disk=DISK, task_id='snapshot_disk')
    start_instance = StartInstanceOperator(
        project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='start_instance')

    이 코드는 정의된 매개변수를 해당 연산자 생성자에 전달하여 워크플로에 필요한 모든 작업을 인스턴스화합니다.

    • task_id 값은 Cloud Composer UI에 표시되는 고유 ID입니다. 나중에 이 ID를 사용하여 작업 간에 데이터를 전달합니다.
    • retries는 실패 전 작업을 재시도할 횟수를 설정합니다. DummyOperator 작업의 경우 이러한 값이 무시됩니다.
    • dag=dag는 작업이 이전에 생성된 DAG에 연결되었음을 나타냅니다. 이 매개변수는 워크플로의 첫 번째 작업에만 필요합니다.
  6. 워크플로 DAG를 구성하는 작업의 순서를 정의합니다.

    # Airflow DAG definition
    begin >> stop_instance >> snapshot_disk >> start_instance >> end
    
  7. gce_commands_plugin.py 파일을 닫습니다.

워크플로 실행

이제 DAG 연산자가 나타내는 워크플로를 Cloud Composer에서 실행할 준비가 되었습니다. Cloud Composer는 연결된 Cloud Storage 버킷에서 DAG 및 플러그인 정의를 읽습니다. 이 버킷과 해당 dagsplugins 디렉터리는 Cloud Composer 환경을 만들 때 자동으로 생성되었습니다.

Cloud Shell을 사용하여 DAG 및 플러그인을 연결된 Cloud Storage 버킷에 복사할 수 있습니다.

  1. Cloud Shell에서 버킷 이름을 가져옵니다.

    BUCKET=$(gsutil ls)
    echo $BUCKET
    

    이름이 gs://[REGION]-{ENVIRONMENT_NAME]-{ID}-bucket/. 형식인 단일 버킷이 있어야 합니다.

  2. 다음 스크립트를 실행하여 DAG 및 플러그인 파일을 해당 버킷 디렉터리에 복사합니다.

    gsutil cp $HOME/composer-infra-python/no_sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins
    gsutil cp $HOME/composer-infra-python/no_sensor/dags/backup_vm_instance.py "$BUCKET"dags
    

    버킷 이름 끝에는 이미 슬래시가 포함되어 있으므로 $BUCKET 변수를 큰따옴표로 묶어야 합니다.

  3. Cloud Console에서 Cloud Composer 페이지로 이동합니다.

    Cloud Composer 페이지로 이동

  4. Airflow 웹 서버 열에서 Airflow 링크를 클릭합니다. Airflow 웹 서버 기본 페이지를 보여주는 새로운 탭이 열립니다. 2~3분 정도 기다린 후 페이지를 새로고칩니다. 페이지가 준비될 때까지 기다렸다가 새로고치는 과정을 몇 번 반복해야 할 수 있습니다.

    새로 만든 DAG를 보여주는 목록이 다음과 유사하게 표시됩니다.

    DAG 목록

    코드에 구문 오류가 있으면 DAG 테이블 맨 위에 메시지가 나타납니다. 런타임 오류가 있으면 DAG 실행 아래에 표시됩니다. 계속하기 전에 오류를 수정하세요. 가장 쉬운 방법은 GitHub 저장소에서 버킷으로 파일을 다시 복사하는 것입니다.

  5. 보다 자세한 스택 추적을 보려면 Cloud Shell에서 다음 명령어를 실행합니다.

    gcloud composer environments run $ENVIRONMENT --location $LOCATION list_dags
    
  6. Airflow가 DAG 실행 열 아래에 표시된 워크플로를 즉시 실행하기 시작합니다.

    워크플로가 이미 진행 중이지만 다시 실행해야 하는 경우 다음 단계를 통해 수동으로 트리거할 수 있습니다.

    1. 링크 열에서 첫 번째 아이콘 DAG 트리거를 클릭합니다(이전 스크린샷에 화살표로 표시됨).
    2. 계속하시겠습니까?라는 확인 팝업 창에서 확인을 클릭합니다.

      몇 초 내로 워크플로가 시작되고 DAG 실행 아래에 밝은 녹색 원 모양으로 새로운 실행이 나타납니다.

  7. 링크 열에서 그래프 보기 아이콘을 클릭합니다(이전 스크린샷에 화살표로 표시됨).

    Cloud Composer 스크린샷

    그래프 보기는 워크플로, 성공적으로 실행된 작업(진한 녹색 테두리), 실행 중인 작업(밝은 녹색 테두리), 대기 중인 작업(테두리 없음)을 보여줍니다. 작업을 클릭하여 로그와 세부정보를 보고 다른 작업을 할 수 있습니다.

  8. 실행 상태를 추적하려면 오른쪽 상단 모서리에서 새로고침 버튼을 주기적으로 클릭합니다.

    수고하셨습니다. 첫 번째 Cloud Composer 워크플로 실행을 완료했습니다. 워크플로가 완료되면 Compute Engine 인스턴스 영구 디스크의 스냅샷이 생성됩니다.

  9. Cloud Shell에서 스냅샷이 생성되었는지 확인합니다.

    gcloud compute snapshots list
    

    또는 Cloud Console 메뉴를 사용해서 Compute Engine 스냅샷 페이지로 이동할 수 있습니다.

    Compute Engine 스냅샷 페이지

이때 스냅샷 하나가 표시되어야 합니다. 지정된 일정에 따라 수동 또는 자동으로 후속 워크플로 실행이 트리거되면 추가 스냅샷이 생성됩니다.

스냅샷은 증분 방식입니다. 첫 번째 스냅샷은 Persistent Disk의 모든 블록이 압축된 형태로 포함되어 있기 때문에 가장 큽니다. 이후 스냅샷에는 이전 스냅샷에서 변경된 블록과 변경되지 않은 블록에 대한 참조만 포함됩니다. 따라서 후속 스냅샷은 첫 번째 스냅샷보다 작으며 생성하는 데 드는 시간과 비용도 적습니다.

스냅샷이 삭제되면 스냅샷 체인에 저장되는 연속 델타의 일관성을 유지하기 위해 해당 데이터가 다음 해당 스냅샷으로 이동됩니다. 모든 스냅샷이 삭제된 경우에만 영구 디스크의 모든 백업 데이터가 삭제됩니다.

커스텀 Airflow 센서 만들기

워크플로를 실행할 때 각 단계를 완료하는 데 약간의 시간이 걸릴 수 있습니다. 이러한 대기 시간이 걸리는 이유는 다음 작업을 시작하기 전 Compute Engine API가 해당 작업을 완료할 수 있도록 마지막에 시간을 제공하는 sleep() 지침이 연산자에 포함되기 때문입니다.

그러나 이 접근법은 최적의 방법이 아니므로 예기치 않은 문제가 발생할 수 있습니다. 예를 들어 스냅샷을 생성하는 동안 증분 스냅샷의 대기 시간이 너무 길어서 이미 완료된 작업을 기다리는 데 시간을 낭비할 수 있습니다. 또는 대기 시간이 너무 짧을 수 있습니다. 이로 인해 머신이 시작될 때 인스턴스가 완전히 중지되지 않았거나 스냅샷 프로세스가 완료되지 않아서 전체 워크플로가 실패하거나 신뢰할 수 없는 결과가 생성될 수 있습니다.

다음 작업에 이전 작업이 완료되었음을 알릴 수 있어야 합니다. 한 가지 해결책은 Airflow 센서를 사용하여 일부 기준이 충족될 때까지 워크플로를 일시중지하는 것입니다. 이 사례에서는 이전 Compute Engine 작업이 성공적으로 완료되는 것이 기준에 해당합니다.

작업 간 교차 통신 데이터 공유

작업이 서로 통신해야 하는 경우 Airflow는 XCom 또는 '교차 통신'이라고 하는 메커니즘을 제공합니다. XCom은 작업이 키, 값, 타임스탬프로 구성된 메시지를 교환할 수 있게 해줍니다.

XCom을 사용하여 메시지를 전달하는 가장 간단한 방법은 연산자가 execute() 메서드에서 값을 반환하는 것입니다. 이 값은 Python이 pickle 모듈을 사용하여 직렬화할 수 있는 모든 객체가 될 수 있습니다.

이전 섹션에 설명된 3개 연산자는 Compute Engine API를 호출합니다. 이러한 API 호출은 모두 작업 리소스 객체를 반환합니다. 이러한 객체는 Airflow 연산자에 대한 비동기 요청과 같은 비동기 요청을 관리하는 데 사용됩니다. 각 객체에는 Compute Engine 작업의 최신 상태를 폴링하는 데 사용할 수 있는 name 필드가 포함됩니다.

작업 리소스 객체의 name을 반환하도록 연산자를 수정합니다.

  1. Cloud Shell에서 nano 또는 vim과 같은 텍스트 편집기를 사용하여 이번에는 sensor/plugins 디렉터리에서 gce_commands_plugin.py 파일을 엽니다.

    vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
    
  2. StopInstanceOperator의 실행 메서드에서 다음 코드를 확인합니다.

    self.compute.instances().stop(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(90)

    이 코드는 다음 코드로 대체되었습니다.

    operation = self.compute.instances().stop(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    return operation['name']

    각 항목의 의미는 다음과 같습니다.

    • 첫 번째 줄은 API 호출의 반환 값을 operation 변수에 캡처합니다.
    • 두 번째 줄은 execute() 메서드에서 작업 name 필드를 반환합니다. 이 명령어는 pickle을 사용하여 이름을 직렬화하고 XCom 작업 간 공유 공간으로 내보냅니다. 나중에 후입 선출 순서로 값을 가져옵니다.

    한 작업이 여러 값을 내보내야 하는 경우 값을 반환하는 대신 xcom_push()를 바로 호출하여 XCom에 명시적 key를 제공할 수 있습니다.

  3. 마찬가지로 SnapshotDiskOperator의 실행 메서드에서 다음 코드를 확인합니다.

    self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    time.sleep(120)

    이 코드는 다음 코드로 대체되었습니다.

    operation = self.compute.disks().createSnapshot(
        project=self.project, zone=self.zone, disk=self.disk,
        body={'name': snapshot_name}).execute()
    return operation['name']

    이 코드에는 관련이 없는 두 개의 이름이 있습니다. 첫 번째는 스냅샷 이름이고, 두 번째는 작업 이름입니다.

  4. 마지막으로 StartInstanceOperator의 실행 메서드에서 다음 코드의 작동 방법을 확인합니다.

    self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    time.sleep(20)

    이 코드는 다음 코드로 대체되었습니다.

    operation = self.compute.instances().start(
        project=self.project, zone=self.zone, instance=self.instance).execute()
    return operation['name']
  5. 이제 gce_commands_plugin.py 파일 전체에서 sleep() 메서드가 호출되지 않습니다. 파일에서 sleep을 검색하여 이를 확인하세요. 또는 이 섹션의 이전 단계를 다시 확인하세요.

    코드에서 sleep() 호출이 수행되지 않기 때문에 파일 상단의 import 섹션에서 다음 줄이 삭제됩니다.

    import time
    
  6. gce_commands_plugin.py 파일을 닫습니다.

센서 구현 및 노출

이전 섹션에서는 Compute Engine 작업 이름을 반환하도록 각 연산자를 수정했습니다. 이 섹션에서는 작업 이름을 사용하여 Compute Engine API에서 각 작업의 완료 여부를 폴링하는 Airflow 센서를 만듭니다.

  1. Cloud Shell에서 nano 또는 vim과 같은 텍스트 편집기를 사용하여 sensor/plugins 디렉터리에서 gce_commands_plugin.py 파일을 엽니다.

    vi $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py
    

    import 섹션 상단의 from airflow.models import BaseOperator 줄 바로 아래에서 다음 코드 줄을 확인합니다.

    from airflow.operators.sensors import BaseSensorOperator
    

    모든 센서는 BaseSensorOperator 클래스에서 파생되며 poke() 메서드를 재정의해야 합니다.

  2. OperationStatusSensor 클래스를 살펴봅니다.

    class OperationStatusSensor(BaseSensorOperator):
      """Waits for a Compute Engine operation to complete."""
    
      @apply_defaults
      def __init__(self, project, zone, instance, prior_task_id, *args, **kwargs):
        self.compute = self.get_compute_api_client()
        self.project = project
        self.zone = zone
        self.instance = instance
        self.prior_task_id = prior_task_id
        super(OperationStatusSensor, self).__init__(*args, **kwargs)
    
      def get_compute_api_client(self):
        credentials = GoogleCredentials.get_application_default()
        return googleapiclient.discovery.build(
            'compute', 'v1', cache_discovery=False, credentials=credentials)
    
      def poke(self, context):
        operation_name = context['task_instance'].xcom_pull(
            task_ids=self.prior_task_id)
        result = self.compute.zoneOperations().get(
            project=self.project, zone=self.zone,
            operation=operation_name).execute()
    
        logging.info(
            "Task '%s' current status: '%s'", self.prior_task_id, result['status'])
        if result['status'] == 'DONE':
          return True
        else:
          logging.info("Waiting for task '%s' to complete", self.prior_task_id)
          return False

    OperationStatusSensor 클래스에는 다음 메서드가 있습니다.

    • __init__: 클래스 생성자 이 생성자는 작업자용 매개변수와 유사한 매개변수를 받습니다(단, prior_task_id는 예외). 이 매개변수는 이전 작업의 ID입니다.
    • poke: BaseSensorOperator에서 재정의된 주 센서 메서드입니다. Airflow는 이 메서드가 True를 반환할 때까지 메서드를 60초마다 호출합니다. 이 경우에만 다운스트림 작업을 실행할 수 있습니다.

      poke_interval 매개변수를 생성자에 전달하여 재시도 간격을 구성할 수 있습니다. 또한 timeout을 정의할 수 있습니다. 자세한 내용은 BaseSensorOperator API 참조 자료를 참조하세요.

      앞의 poke 메서드 구현에서 첫 번째 줄은 xcom_pull() 호출입니다. 이 메서드는 prior_task_id로 식별된 작업의 최근 XCom 값을 가져옵니다. 값은 Compute Engine 작업의 이름이며 operation_name 변수에 저장됩니다.

      그런 다음 코드가 zoneOperations.get() 메서드를 실행하고 operation_name을 매개변수로 전달하여 작업의 최신 상태를 가져옵니다. 상태가 DONE이면 poke() 메서드가 True를 반환하고, 그렇지 않으면 False를 반환합니다. 전자의 경우 다운스트림 작업이 시작됩니다. 후자의 경우 워크플로 실행이 일시 중지된 상태로 유지되고 poke_interval초 후에 poke() 메서드가 다시 호출됩니다.

  3. 파일 하단에서 플러그인으로 내보낸 작업자 목록에 OperationStatusSensor를 추가하도록 GoogleComputeEnginePlugin 클래스가 어떻게 업데이트되었는지 확인합니다.

    class GoogleComputeEnginePlugin(AirflowPlugin):
      """Expose Airflow operators and sensor."""
    
      name = 'gce_commands_plugin'
      operators = [StopInstanceOperator, SnapshotDiskOperator,
                   StartInstanceOperator, OperationStatusSensor]
  4. gce_commands_plugin.py 파일을 닫습니다.

워크플로 업데이트

플러그인에서 센서를 만든 후 워크플로에 센서를 추가할 수 있습니다. 이 섹션에서는 세 연산자와 그 사이의 센서 작업을 모두 포함하는 최종 상태로 워크플로를 업데이트합니다. 그런 다음 업데이트된 워크플로를 실행하고 확인합니다.

  1. Cloud Shell에서 nano 또는 vim과 같은 텍스트 편집기를 사용하여 이번에는 sensor/dags 디렉터리에서 backup_vm_instance.py 파일을 엽니다.

    vi $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py
    
    
  2. import 섹션의 from airflow operators import StartInstanceOperator 줄 아래에서 새로 생성된 센서를 가져왔는지 확인합니다.

    from airflow.operators import OperationStatusSensor
    
  3. ## Wait tasks 주석 다음 줄을 살펴봅니다.

    ## Wait tasks
    wait_for_stop = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='stop_instance', poke_interval=15, task_id='wait_for_stop')
    wait_for_snapshot = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='snapshot_disk', poke_interval=10,
        task_id='wait_for_snapshot')
    wait_for_start = OperationStatusSensor(
        project=PROJECT, zone=ZONE, instance=INSTANCE,
        prior_task_id='start_instance', poke_interval=5, task_id='wait_for_start')

    이 코드는 OperationStatusSensor를 재사용하여 3개의 중간 '대기 작업'을 정의합니다. 이러한 각 작업은 이전 작업이 완료되기를 기다립니다. 다음 매개변수가 센서 생성자에 전달됩니다.

    • 파일에 이미 정의되어 있는 WordPress 인스턴스의 PROJECT, ZONE, INSTANCE
    • prior_task_id: 센서가 대기 중인 작업의 ID입니다. 예를 들어 wait_for_stop 작업은 ID가 stop_instance인 작업이 완료될 때까지 기다립니다.

    • poke_interval: 센서의 poke() 메서드 호출 재시도 간격 중 Airflow가 기다려야 하는 시간(초)입니다. 즉, prior_task_id가 이미 완료되었는지 확인하는 빈도입니다.

    • task_id: 새로 생성된 대기 작업의 ID입니다.

  4. 파일 하단에서 다음 코드를 확인합니다.

    begin >> stop_instance >> snapshot_disk >> start_instance >> end
    

    이 코드는 다음 코드로 대체되었습니다.

    begin >> stop_instance >> wait_for_stop >> snapshot_disk >> wait_for_snapshot \
            >> start_instance >> wait_for_start >> end
    

    이 줄은 전체 백업 워크플로를 정의합니다.

  5. backup_vm_instance.py 파일을 닫습니다.

이제 연결된 Cloud Storage 버킷에서 DAG 및 플러그인을 복사해야 합니다.

  1. Cloud Shell에서 버킷 이름을 가져옵니다.

    BUCKET=$(gsutil ls)
    echo $BUCKET
    

    이름이 gs://[REGION]-[ENVIRONMENT_NAME]-[ID]-bucket/. 형식인 단일 버킷이 표시되어야 합니다.

  2. 다음 스크립트를 실행하여 DAG 및 플러그인 파일을 해당 버킷 디렉터리에 복사합니다.

    gsutil cp $HOME/composer-infra-python/sensor/plugins/gce_commands_plugin.py "$BUCKET"plugins
    gsutil cp $HOME/composer-infra-python/sensor/dags/backup_vm_instance.py "$BUCKET"dags
    

    버킷 이름에는 이미 후행 슬래시가 포함되어 있으므로 $BUCKET 변수를 큰따옴표로 묶어야 합니다.

  3. 업데이트된 워크플로를 Airflow에 업로드합니다.

    1. Cloud Console에서 Cloud Composer 페이지로 이동합니다.

      Cloud Composer 페이지로 이동

    2. Airflow 열에서 Airflow 웹 서버 링크를 클릭하여 Airflow 기본 페이지를 표시합니다.

    3. Airflow가 플러그인과 워크플로를 자동으로 업데이트할 때까지 2~3분 동안 기다립니다. DAG 테이블이 잠시 비어 있는 것을 볼 수 있습니다. 링크 섹션이 일관적으로 표시될 때까지 페이지를 몇 번 새로고치세요.

    4. 오류가 표시되지 않았는지 확인하고 링크 섹션에서 트리 보기를 클릭합니다.

      Cloud Composer 트리 보기 페이지 스크린샷 왼쪽에 워크플로가 상향식 트리로 표시됩니다. 오른쪽에는 각기 다른 날짜의 작업 실행 그래프가 표시됩니다. 녹색 사각형은 해당 작업과 날짜의 성공적인 실행을 의미합니다. 흰색 사각형은 실행된 적이 없는 작업을 의미합니다. 새 센서 작업으로 DAG를 업데이트했으므로 이러한 모든 작업이 흰색으로 표시되고 Compute Engine 작업은 녹색으로 표시됩니다.

    5. 업데이트된 백업 워크플로를 실행합니다.

      1. 상단 메뉴에서 DAG를 클릭하여 기본 페이지로 돌아갑니다.
      2. 링크 열에서 DAG 트리거를 클릭합니다.
      3. 계속하시겠습니까?라는 확인 팝업 창에서 확인을 클릭합니다. 새로운 workflowA 실행이 시작되고 DAG 실행 열에 밝은 녹색 원 모양으로 표시됩니다.
    6. 링크에서 워크플로 실행을 실시간으로 관찰하려면 그래프 보기 아이콘을 클릭합니다.

    7. 오른쪽에 있는 새로고침 버튼을 클릭하여 작업 실행을 추적합니다. 각 센서 작업에서 이전 작업이 완료될 때까지 기다리기 위해 워크플로 흐름이 중지되는 방식에 유의하세요. 대기 시간은 하드 코딩된 sleep 값에 의존하지 않고 각 작업의 필요에 맞게 조정됩니다.

    Cloud Composer 작업 실행 스크린샷

  4. 선택적으로 워크플로 중 Cloud Console로 돌아가서 Compute Engine 메뉴를 선택하고, VM 인스턴스를 클릭하여 가상 머신이 중지되고 재시작되는 방법을 확인합니다. 또한 스냅샷을 클릭하여 새 스냅샷이 생성되는 것을 볼 수 있습니다.

이제 Compute Engine 인스턴스에서 스냅샷을 만드는 백업 워크플로가 실행된 상태입니다. 이 스냅샷은 모범 사례를 따르고 센서를 사용하여 흐름을 최적화합니다.

스냅샷에서 인스턴스 복원

스냅샷을 생성하는 것은 백업 과정의 일부일 뿐입니다. 다른 부분은 스냅샷에서 인스턴스를 복원하는 것입니다.

스냅샷을 사용하여 인스턴스를 만들려면 다음 안내를 따르세요.

  1. Cloud Shell에서 사용 가능한 스냅샷 목록을 가져옵니다.

    gcloud compute snapshots list
    

    출력은 다음과 비슷합니다.

    NAME                              DISK_SIZE_GB  SRC_DISK                            STATUS
    wordpress-1-vm-2018-07-18-120044  10            us-central1-c/disks/wordpress-1-vm  READY
    wordpress-1-vm-2018-07-18-120749  10            us-central1-c/disks/wordpress-1-vm  READY
    wordpress-1-vm-2018-07-18-125138  10            us-central1-c/disks/wordpress-1-vm  READY
    
  2. 스냅샷을 선택하고 이 스냅샷으로 독립 실행형 부팅 영구 디스크를 만듭니다. 대괄호로 묶인 자리 표시자를 고유한 값으로 바꿉니다.

    gcloud compute disks create [DISK_NAME] --source-snapshot [SNAPSHOT_NAME] \
        --zone=[ZONE]
    

    각 항목의 의미는 다음과 같습니다.

    • DISK_NAME은 새로운 독립 실행형 부팅 영구 디스크의 이름입니다.
    • SNAPSHOT_NAME은 이전 출력의 첫 번째 열에서 선택한 스냅샷입니다.
    • ZONE은 새 디스크가 생성될 컴퓨팅 영역(zone)입니다.
  3. 부팅 디스크를 사용하여 새 인스턴스를 만듭니다. [INSTANCE_NAME]을 생성하려는 인스턴스의 이름으로 바꿉니다.

    gcloud compute instances create [INSTANCE_NAME] --disk name=[DISK_NAME],boot=yes \
        --zone=ZONE --tags=wordpress-1-tcp-443,wordpress-1-tcp-80
    

    명령어에 이러한 두 태그를 지정하면 인스턴스가 초기 WordPress 인스턴스용으로 생성된 기존 방화벽 규칙 때문에 포트 443 및 80에서 들어오는 트래픽을 자동으로 수신할 수 있습니다.

    이전 명령어에서 반환한 새 인스턴스의 외부 IP를 기록해 두세요.

  4. WordPress가 새로 생성된 인스턴스에서 실행 중인지 확인합니다. 새 브라우저 탭에서 외부 IP 주소로 이동합니다. WordPress 기본 방문 페이지가 표시됩니다.

  5. 또는 Console의 스냅샷을 사용하여 인스턴스를 만듭니다.

    1. Cloud Console에서 스냅샷 페이지로 이동합니다.

      스냅샷 페이지로 이동

    2. 가장 최근의 스냅샷을 클릭합니다.

    3. 인스턴스 만들기를 클릭합니다.

    4. 새 VM 인스턴스 양식에서 관리, 보안, 디스크, 네트워킹, 단독 임대를 클릭한 후 네트워킹 탭을 클릭합니다.

    5. wordpress-1-tcp-443wordpress-1-tcp-80네트워크 태그 필드에 추가하고 각 태그를 입력할 때마다 Enter 키를 누릅니다. 이러한 태그에 대한 설명은 위의 내용을 참조하세요.

    6. 만들기를 클릭합니다.

      최신 스냅샷에 기반을 둔 새 인스턴스가 생성되고 콘텐츠를 제공할 준비가 되었습니다.

  6. Compute Engine 인스턴스 페이지를 열고 새 인스턴스의 외부 IP를 기록해 둡니다.

  7. WordPress가 새로 생성된 인스턴스에서 실행 중인지 확인합니다. 새 브라우저 탭에서 외부 IP로 이동합니다.

자세한 내용은 스냅샷을 사용하여 인스턴스 만들기를 참조하세요.

삭제

  1. Cloud Console에서 리소스 관리 페이지로 이동합니다.

    리소스 관리로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.

다음 단계