환경 업그레이드

이 페이지에서는 환경에서 실행되는 Airflow 버전 또는 Cloud Composer 버전을 업그레이드하는 방법을 설명합니다.

업그레이드하는 동안 Cloud Composer는 다음을 수행합니다.

  • Airflow 스케줄러 및 작업자 pod를 새 Kubernetes 네임스페이스에 다시 배포합니다. 업그레이드가 완료되면 Airflow가 새로운 MySQL 데이터베이스를 사용합니다. 데이터베이스 이름은 Kubernetes 네임스페이스와 일치합니다. DAG 실행 기록은 보존됩니다.

  • Airflow airflow_db 연결이 새 Cloud SQL 데이터베이스를 가리키도록 업데이트합니다.

이러한 변경사항으로 인해 Pod 액세스 방법과 Cloud SQL 데이터베이스 연결 방법도 달라집니다.

  • 업그레이드 후에 GKE 클러스터의 Pod에 액세스하려면 네임스페이스를 인식하는 kubectl 명령어를 사용해야 합니다. 예를 들어 클러스터의 Pod를 나열하려면 kubectl get pods -A를 사용합니다. Pod에서 명령어를 실행하려면 kubectl exec -n <NAMESPACE> ...를 사용합니다.
  • SQL 프록시를 직접 참조하는 Airflow 연결과 워크로드를 사용하는 경우 기본 네임스페이스를 호스트 이름 airflow-sqlproxy-service.default의 일부로 사용합니다(airflow-sqlproxy-service가 아님).

Cloud Composer를 업그레이드해도 Google Kubernetes Engine 노드 VM IP 주소, Cloud SQL 인스턴스 IP 주소, Cloud Storage 버킷 또는 Airflow 웹 서버 도메인 이름과 같이 사용자 환경의 리소스에 연결하는 방법에는 변화가 없습니다.

시작하기 전에

  • 환경 업그레이드는 현재 미리보기 상태입니다. 프로덕션 환경에서는 이 기능을 주의해서 사용해야 합니다.
  • 업그레이드하려면 roles/editor 역할 또는 roles/composer.admin 역할이 필요합니다.
  • 업그레이드하기 전에 모든 DAG를 일시중지하고 진행 중인 태스크가 완료될 때까지 기다립니다.
  • Cloud Composer, Airflow 버전 또는 둘 다를 동시에 업그레이드할 수 있습니다.
  • 업그레이드하려는 Cloud Composer-Airflow 조합은 출시된 버전이어야 합니다.
    • 사용 가능한 업그레이드는 사용 가능한 업그레이드 보기를 참조하세요. 최신 기능과 수정사항을 가져오려면 최신 Cloud Composer 출시 버전으로 업그레이드하는 것이 좋습니다.
    • 지원되는 버전의 PyPI 패키지 및 맞춤설정 목록은 Cloud Composer 버전 목록을 참조하세요.

      업그레이드하기 전에 Airflow와 Cloud Composer의 현재 버전과 업그레이드 버전의 차이점을 알고 있어야 합니다. 호환되지 않는 변경사항으로 인해 DAG가 중단될 수 있습니다.

Airflow 데이터베이스 유지보수

Airflow 데이터베이스가 시간이 지날수록 더 많은 데이터를 수집하고 있습니다.

유지보수 DAG를 사용하여 데이터베이스의 콘텐츠를 프루닝할 수 있습니다. 이 DAG는 DagRun, TaskInstance, Log, XCom, Job DB, SlaMiss 테이블에서 이전 항목을 삭제합니다.

데이터베이스 유지보수 DAG

"""
A maintenance workflow that you can deploy into Airflow to periodically clean
out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid
having too much data in your Airflow MetaStore.

## Authors

The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup)

## Usage

1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME,
  ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values

2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each
   dictionary in the list features the following parameters:
    - airflow_db_model: Model imported from airflow.models corresponding to
      a table in the airflow metadata database
    - age_check_column: Column in the model/table to use for calculating max
      date of data deletion
    - keep_last: Boolean to specify whether to preserve last run instance
        - keep_last_filters: List of filters to preserve data from deleting
          during clean-up, such as DAG runs where the external trigger is set to 0.
        - keep_last_group_by: Option to specify column by which to group the
          database entries and perform aggregate functions.

3. Create and Set the following Variables in the Airflow Web Server
  (Admin -> Variables)
    - airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain
      the log files if not already provided in the conf. If this is set to 30,
      the job will remove those files that are 30 days old or older.

4. Put the DAG in your gcs bucket.
"""
from datetime import datetime, timedelta
import logging
import os

import airflow
from airflow import settings
from airflow.configuration import conf
from airflow.jobs import BaseJob
from airflow.models import DAG, DagModel, DagRun, Log, SlaMiss, \
    TaskInstance, Variable, XCom
from airflow.operators.python_operator import PythonOperator
import dateutil.parser
from sqlalchemy import and_, func
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import load_only

try:
    # airflow.utils.timezone is available from v1.10 onwards
    from airflow.utils import timezone
    now = timezone.utcnow
except ImportError:
    now = datetime.utcnow

# airflow-db-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = airflow.utils.dates.days_ago(1)
# How often to Run. @daily - Once a day at Midnight (UTC)
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that arE 30 days old or older.

DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
    Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))
# Prints the database entries which will be getting deleted; set to False
# to avoid printing large lists and slowdown process
PRINT_DELETES = False
# Whether the job should delete the db entries or not. Included if you want to
# temporarily avoid deleting the db entries.
ENABLE_DELETE = True
# List of all the objects that will be deleted. Comment out the DB objects you
# want to skip.
DATABASE_OBJECTS = [{
    "airflow_db_model": BaseJob,
    "age_check_column": BaseJob.latest_heartbeat,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": DagRun,
    "age_check_column": DagRun.execution_date,
    "keep_last": True,
    "keep_last_filters": [DagRun.external_trigger.is_(False)],
    "keep_last_group_by": DagRun.dag_id
}, {
    "airflow_db_model": TaskInstance,
    "age_check_column": TaskInstance.execution_date,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": Log,
    "age_check_column": Log.dttm,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": XCom,
    "age_check_column": XCom.execution_date,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": SlaMiss,
    "age_check_column": SlaMiss.execution_date,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}, {
    "airflow_db_model": DagModel,
    "age_check_column": DagModel.last_scheduler_run,
    "keep_last": False,
    "keep_last_filters": None,
    "keep_last_group_by": None
}]

# Check for TaskReschedule model
try:
    from airflow.models import TaskReschedule
    DATABASE_OBJECTS.append({
        "airflow_db_model": TaskReschedule,
        "age_check_column": TaskReschedule.execution_date,
        "keep_last": False,
        "keep_last_filters": None,
        "keep_last_group_by": None
    })

except Exception as e:
    logging.error(e)

# Check for TaskFail model
try:
    from airflow.models import TaskFail
    DATABASE_OBJECTS.append({
        "airflow_db_model": TaskFail,
        "age_check_column": TaskFail.execution_date,
        "keep_last": False,
        "keep_last_filters": None,
        "keep_last_group_by": None
    })

except Exception as e:
    logging.error(e)

# Check for RenderedTaskInstanceFields model
try:
    from airflow.models import RenderedTaskInstanceFields
    DATABASE_OBJECTS.append({
        "airflow_db_model": RenderedTaskInstanceFields,
        "age_check_column": RenderedTaskInstanceFields.execution_date,
        "keep_last": False,
        "keep_last_filters": None,
        "keep_last_group_by": None
    })

except Exception as e:
    logging.error(e)

# Check for ImportError model
try:
    from airflow.models import ImportError
    DATABASE_OBJECTS.append({
        "airflow_db_model": ImportError,
        "age_check_column": ImportError.timestamp,
        "keep_last": False,
        "keep_last_filters": None,
        "keep_last_group_by": None
    })

except Exception as e:
    logging.error(e)

# Check for celery executor
airflow_executor = str(conf.get("core", "executor"))
logging.info("Airflow Executor: " + str(airflow_executor))
if (airflow_executor == "CeleryExecutor"):
    logging.info("Including Celery Modules")
    try:
        from celery.backends.database.models import Task, TaskSet
        DATABASE_OBJECTS.extend(({
            "airflow_db_model": Task,
            "age_check_column": Task.date_done,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None
        }, {
            "airflow_db_model": TaskSet,
            "age_check_column": TaskSet.date_done,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None
        }))

    except Exception as e:
        logging.error(e)

session = settings.Session()

default_args = {
    "owner": DAG_OWNER_NAME,
    "depends_on_past": False,
    "email": ALERT_EMAIL_ADDRESSES,
    "email_on_failure": True,
    "email_on_retry": False,
    "start_date": START_DATE,
    "retries": 1,
    "retry_delay": timedelta(minutes=1)
}

dag = DAG(
    DAG_ID,
    default_args=default_args,
    schedule_interval=SCHEDULE_INTERVAL,
    start_date=START_DATE)
if hasattr(dag, "doc_md"):
    dag.doc_md = __doc__
if hasattr(dag, "catchup"):
    dag.catchup = False

def print_configuration_function(**context):
    logging.info("Loading Configurations...")
    dag_run_conf = context.get("dag_run").conf
    logging.info("dag_run.conf: " + str(dag_run_conf))
    max_db_entry_age_in_days = None
    if dag_run_conf:
        max_db_entry_age_in_days = dag_run_conf.get(
            "maxDBEntryAgeInDays", None)
    logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf))
    if (max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1):
        logging.info(
            "maxDBEntryAgeInDays conf variable isn't included or Variable " +
            "value is less than 1. Using Default '" +
            str(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS) + "'")
        max_db_entry_age_in_days = DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS
    max_date = now() + timedelta(-max_db_entry_age_in_days)
    logging.info("Finished Loading Configurations")
    logging.info("")

    logging.info("Configurations:")
    logging.info("max_db_entry_age_in_days: " + str(max_db_entry_age_in_days))
    logging.info("max_date:                 " + str(max_date))
    logging.info("enable_delete:            " + str(ENABLE_DELETE))
    logging.info("session:                  " + str(session))
    logging.info("")

    logging.info("Setting max_execution_date to XCom for Downstream Processes")
    context["ti"].xcom_push(key="max_date", value=max_date.isoformat())

print_configuration = PythonOperator(
    task_id="print_configuration",
    python_callable=print_configuration_function,
    provide_context=True,
    dag=dag)

def cleanup_function(**context):

    logging.info("Retrieving max_execution_date from XCom")
    max_date = context["ti"].xcom_pull(
        task_ids=print_configuration.task_id, key="max_date")
    max_date = dateutil.parser.parse(max_date)  # stored as iso8601 str in xcom

    airflow_db_model = context["params"].get("airflow_db_model")
    state = context["params"].get("state")
    age_check_column = context["params"].get("age_check_column")
    keep_last = context["params"].get("keep_last")
    keep_last_filters = context["params"].get("keep_last_filters")
    keep_last_group_by = context["params"].get("keep_last_group_by")

    logging.info("Configurations:")
    logging.info("max_date:                 " + str(max_date))
    logging.info("enable_delete:            " + str(ENABLE_DELETE))
    logging.info("session:                  " + str(session))
    logging.info("airflow_db_model:         " + str(airflow_db_model))
    logging.info("state:                    " + str(state))
    logging.info("age_check_column:         " + str(age_check_column))
    logging.info("keep_last:                " + str(keep_last))
    logging.info("keep_last_filters:        " + str(keep_last_filters))
    logging.info("keep_last_group_by:       " + str(keep_last_group_by))

    logging.info("")

    logging.info("Running Cleanup Process...")

    try:
        query = session.query(airflow_db_model).options(
            load_only(age_check_column))

        logging.info("INITIAL QUERY : " + str(query))

        if keep_last:

            subquery = session.query(func.max(DagRun.execution_date))
            # workaround for MySQL "table specified twice" issue
            # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
            if keep_last_filters is not None:
                for entry in keep_last_filters:
                    subquery = subquery.filter(entry)

                logging.info("SUB QUERY [keep_last_filters]: " + str(subquery))

            if keep_last_group_by is not None:
                subquery = subquery.group_by(keep_last_group_by)
                logging.info(
                    "SUB QUERY [keep_last_group_by]: " +
                    str(subquery))

            subquery = subquery.from_self()

            query = query.filter(
                and_(age_check_column.notin_(subquery)),
                and_(age_check_column <= max_date))

        else:
            query = query.filter(age_check_column <= max_date,)

        if PRINT_DELETES:
            entries_to_delete = query.all()

            logging.info("Query: " + str(query))
            logging.info("Process will be Deleting the following " +
                         str(airflow_db_model.__name__) + "(s):")
            for entry in entries_to_delete:
                date = str(entry.__dict__[str(age_check_column).split(".")[1]])
                logging.info("\tEntry: " + str(entry) + ", Date: " + date)

            logging.info("Process will be Deleting "
                         + str(len(entries_to_delete)) + " "
                         + str(airflow_db_model.__name__) + "(s)")
        else:
            logging.warn(
                "You've opted to skip printing the db entries to be deleted. "
                "Set PRINT_DELETES to True to show entries!!!")

        if ENABLE_DELETE:
            logging.info("Performing Delete...")
            # using bulk delete
            query.delete(synchronize_session=False)
            session.commit()
            logging.info("Finished Performing Delete")
        else:
            logging.warn("You've opted to skip deleting the db entries. "
                         "Set ENABLE_DELETE to True to delete entries!!!")

        logging.info("Finished Running Cleanup Process")

    except ProgrammingError as e:
        logging.error(e)
        logging.error(
            str(airflow_db_model) + " is not present in the metadata."
            "Skipping...")

for db_object in DATABASE_OBJECTS:

    cleanup_op = PythonOperator(
        task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
        python_callable=cleanup_function,
        params=db_object,
        provide_context=True,
        dag=dag)

    print_configuration.set_downstream(cleanup_op)

또한 'DAG 삭제' 섹션의 설명대로 더 이상 필요하지 않은 DAG와 관련된 항목을 삭제할 수 있습니다.

제한사항

  • 이전 버전의 Cloud Composer 또는 Airflow로 다운그레이드할 수는 없습니다.
  • 동일한 주 버전 내 최신 Cloud Composer 버전으로만 업그레이드할 수 있습니다(예: composer-1.12.4-airflow-1.10.10에서 composer-1.13.0-airflow-1.10.10으로 업그레이드). composer-1.4.0-airflow-1.10.0에서 composer-2.0.0-airflow-1.10.0으로 업그레이드는 Cloud Composer의 주 버전이 1에서 2로 변경되므로 허용되지 않습니다.
  • 업그레이드 대상 이미지 버전은 사용자 환경의 현재 Python 버전을 지원해야 합니다.
  • Airflow 데이터베이스에 16GB가 넘는 데이터가 포함된 경우 업그레이드를 수행할 수 없습니다. 그럴 경우에는 업그레이드 시 Airflow 데이터베이스 유지보수 섹션에 설명된 대로 데이터베이스 유지보수를 수행해야 한다는 경고 메시지가 표시됩니다.

사용 가능한 업그레이드 보기

업그레이드할 수 있는 Cloud Composer 버전을 보려면 다음 안내를 따르세요.

Console

  1. Google Cloud에서 환경 페이지를 엽니다.

    환경 페이지 열기

  2. 환경 이름을 클릭합니다.

  3. 환경 구성 탭에서 이미지 버전 업그레이드를 클릭합니다.

  4. 사용 가능한 버전을 보려면 Cloud Composer 이미지 버전 드롭다운 메뉴를 클릭합니다.

gcloud

gcloud beta composer environments list-upgrades ENVIRONMENT_NAME \
    --location LOCATION 

각 항목의 의미는 다음과 같습니다.

  • ENVIRONMENT_NAME은 환경 이름입니다.
  • LOCATION은 환경이 위치한 Compute Engine 리전입니다.

예:

gcloud beta composer environments list-upgrades test-environment \
    --location us-central1
┌─────────────────────────────────────────────────────────────────────────────┐
│                              AVAILABLE UPGRADES                             │
├──────────────────────────────┬──────────────────┬───────────────────────────┤
│        IMAGE VERSION         │ COMPOSER DEFAULT │ SUPPORTED PYTHON VERSIONS │
├──────────────────────────────┼──────────────────┼───────────────────────────┤
│ composer-1.4.0-airflow-1.9.0 │ True             │ 2,3                       │
└──────────────────────────────┴──────────────────┴───────────────────────────┘

API

Cloud Composer REST API로 사용 가능한 버전을 보려면 imageVersions.list API 요청을 작성하고 projects/{projectId}/locations/{locationId} 형식으로 프로젝트와 위치를 제공합니다.

예:

GET https://composer.googleapis.com/v1/projects/test-project-id/locations/us-central1/imageVersions

{
  "imageVersions": [
    {
      "imageVersionId": "composer-1.4.2-airflow-1.10.0",
      "supportedPythonVersions": [
        "2",
        "3"
      ]
    },
    {
      "imageVersionId": "composer-1.4.2-airflow-1.9.0",
      "isDefault": true,
      "supportedPythonVersions": [
        "2",
        "3"
      ]
    }
  ]
} 

Cloud Composer 버전 업그레이드

환경에서 실행되는 Cloud Composer 버전을 업그레이드하려면 다음 안내를 따르세요.

Console

  1. Google Cloud에서 환경 페이지를 엽니다.

    환경 페이지 열기

  2. 수정할 환경 이름을 클릭합니다.

  3. 환경 구성 탭에서 이미지 버전 업그레이드를 클릭합니다.

  4. Cloud Composer 이미지 버전 드롭다운 메뉴를 클릭하고 버전을 선택합니다.

  5. 제출을 클릭합니다.

gcloud

gcloud beta composer environments update ENVIRONMENT_NAME \
    --location LOCATION --image-version VERSION

각 항목의 의미는 다음과 같습니다.

  • ENVIRONMENT_NAME은 환경 이름입니다.
  • LOCATION은 환경이 위치한 Compute Engine 리전입니다.
  • VERSION은 개발자 환경에 사용할 Cloud Composer 버전과 Airflow 버전입니다(composer-a.b.c-airflow-x.y.z 또는 composer-a.b.c-airflow-x.y 형식). Airflow 패치를 지정하지 않으면 지정된 주 버전 및 부 버전에 사용 가능한 최신 패치 버전이 사용됩니다.

예:

gcloud beta composer environments update test-environment \
    --location us-central1 --image-version composer-latest-airflow-1.10.1 

API

Cloud Composer REST API를 사용하여 업그레이드하려면 environments.patch API 요청을 작성합니다. 버전composer-a.b.c-airflow-x.y.z 형식으로 제공합니다.

예:

PATCH https://composer.googleapis.com/v1beta1/projects/test-project/locations/us-central1/environments/test-environment?updateMask=config.software_config.image_version

요청 본문에는 imageVersion이 포함됩니다.

{
  "config": {
    "softwareConfig": {
      "imageVersion": "composer-1.6.0-airflow-1.10.1"
    }
  }
}

Airflow 버전 업그레이드

최신 Cloud Composer 버전을 실행하는 환경에서는 Cloud SDK를 사용하여 Airflow 버전만 업그레이드할 수 있습니다(예: composer-1.6.1-airflow-1.9.0에서 composer-1.6.1-airflow-1.10.0으로 업그레이드).

gcloud beta composer environments update ENVIRONMENT_NAME \
--location LOCATION --airflow-version VERSION

각 항목의 의미는 다음과 같습니다.

  • ENVIRONMENT_NAME은 환경 이름입니다.
  • LOCATION은 환경이 위치한 Compute Engine 리전입니다.
  • VERSION은 개발자 환경에서 사용할 x.y.z 또는 x.y 형식의 Airflow 버전입니다.

예:

gcloud beta composer environments update test-environment \
--location us-central1 --airflow-version=1.10.1