환경 업데이트 및 업그레이드 문제 해결

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 Cloud Composer 환경을 업데이트하거나 업그레이드할 때 발생할 수 있는 문제에 대한 문제 해결 정보를 제공합니다.

환경 만들기와 관련된 문제 해결 정보는 환경 만들기 문제 해결을 참조하세요.

Cloud Composer 환경이 업데이트될 때 대부분의 문제는 다음과 같은 이유로 발생합니다.

  • 서비스 계정 권한 문제
  • PyPI 종속 항목 문제
  • Airflow 데이터베이스 크기

환경 업데이트 또는 업그레이드 권한 부족

권한 부족으로 인해 Cloud Composer가 환경을 업데이트하거나 업그레이드할 수 없으면 다음 오류 메시지가 출력됩니다.

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

해결책: 액세스 제어에 설명된 대로 사용자 계정과 사용자 환경의 서비스 계정 모두에 역할을 할당합니다.

환경의 서비스 계정에 권한이 충분하지 않음

Cloud Composer 환경을 만들 때 환경의 GKE 클러스터 노드를 실행하는 서비스 계정을 지정합니다. 이 서비스 계정에 요청된 작업에 대한 권한이 충분하지 않은 경우 Cloud Composer에서 다음 오류를 출력합니다.

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

해결책: 액세스 제어에 설명된 대로 사용자 계정과 사용자 환경의 서비스 계정 모두에 역할을 할당합니다.

Airflow 데이터베이스 크기가 너무 커서 작업을 수행할 수 없음

Airflow 데이터베이스의 크기가 너무 커서 업그레이드 작업이 성공할 수 없기 때문에 Cloud Composer 업그레이드 작업이 성공하지 못할 수 있습니다.

Airflow 데이터베이스 크기가 16GB보다 크면 Cloud Composer가 다음 오류를 출력합니다.

Airflow database uses more than 16 GB. Please clean the database before upgrading.

해결 방법: Airflow 데이터베이스 유지보수에 설명된 대로 Airflow 데이터베이스 정리를 수행합니다.

PyPI 패키지 충돌로 인해 새 Cloud Composer 버전으로 업그레이드할 수 없음

커스텀 PyPI 패키지가 설치된 환경을 업그레이드할 때 PyPI 패키지 충돌과 관련된 오류가 발생할 수 있습니다. 이는 새 Cloud Composer 이미지에 사전 설치된 패키지의 최신 버전이 포함되어 있어 환경에 설치한 PyPI 패키지와 종속 항목이 충돌할 수 있기 때문입니다.

해결책:

  • 패키지 충돌에 대한 자세한 정보를 확인하려면 업그레이드 확인을 실행합니다.
  • 설치된 커스텀 PyPI 패키지의 버전 제약조건을 완화합니다. 예를 들어 버전을 ==1.0.1로 지정하는 대신 >=1.0.1로 지정합니다.
  • 충돌 종속 항목을 해결하기 위한 버전 변경 요구사항에 대한 자세한 내용은 pip 문서를 참조하세요.

DNS 연결이 부족하면 업그레이드 또는 업데이트를 수행하는 동안 문제가 발생할 수 있음

이러한 연결 문제는 다음과 같은 로그 항목을 일으킬 수 있습니다.

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

일반적으로 DNS에 대한 경로가 없기 때문에 metadata.google.internal DNS 이름을 클러스터, 포드, 서비스 네트워크 내의 IP 주소로 확인할 수 있는지 확인합니다. 환경이 생성된 VPC(호스트 또는 서비스 프로젝트) 내에서 비공개 Google 액세스가 사용 설정되어 있는지 확인합니다.

추가 정보

트리거 CPU가 vCPU 1개 한도 초과

버전 2.4.4 이상의 Cloud Composer 2는 성능 확장을 개선하기 위해 다른 트리거 리소스 할당 전략을 도입합니다. 환경 업데이트를 수행할 때 트리거 CPU와 관련된 오류가 발생하면 현재 트리거가 트리거당 2개 이상의 vCPU를 사용하도록 구성된 것입니다.

해결책:

이전 실패 경고 검사

Airflow를 이후 버전으로 업그레이드할 때 Airflow 데이터베이스에 새로운 제약조건이 적용되는 경우가 있습니다. 이러한 제약조건을 적용할 수 없는 경우 Airflow는 해당 제약조건을 적용할 수 없는 행을 저장할 새 테이블을 만듭니다. 이동한 데이터 테이블의 이름이 변경되거나 삭제될 때까지 Airflow UI에 경고 메시지가 표시됩니다.

해결책:

다음 두 DAG를 사용하여 이동한 데이터를 검사하고 테이블의 이름을 바꿀 수 있습니다.

list_moved_tables_after_upgrade_dag DAG는 제약조건을 적용할 수 없는 모든 테이블에서 이동된 행을 나열합니다. 데이터를 검사하여 계속 보관할지 여부를 결정합니다. 계속 보관하려면 Airflow 데이터베이스의 데이터를 수동으로 수정해야 합니다. 예를 들어 올바른 데이터가 포함된 행을 다시 추가합니다.

데이터가 필요하지 않거나 이미 수정한 경우 rename_moved_tables_after_upgrade_dag DAG를 실행할 수 있습니다. 이 DAG는 이동한 테이블의 이름을 바꿉니다. 테이블과 해당 데이터는 삭제되지 않으므로 나중에 데이터를 검토할 수 있습니다.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX

def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )

def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())

def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")

with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

다음 단계