環境の更新とアップグレードのトラブルシューティング

Cloud Composer 1 | Cloud Composer 2

このページでは、Cloud Composer 環境の更新中またはアップグレード中に発生する可能性がある問題のトラブルシューティングについて説明します。

環境の作成に関連するトラブルシューティング情報については、環境作成のトラブルシューティングをご覧ください。

Cloud Composer 環境が更新される際、ほとんどの問題は次の理由で発生します。

  • サービス アカウントの権限の問題
  • PyPI の依存関係に関する問題
  • Airflow データベースのサイズ

環境を更新またはアップグレードする権限がありません

権限不足により Cloud Composer で環境を更新またはアップグレードできない場合、次のエラー メッセージが表示されます。

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

解決策: アクセス制御での説明に沿って、アカウントと環境のサービス アカウントの両方にロールを割り当てます。

環境のサービス アカウントに十分な権限がありません

Cloud Composer 環境を作成するときに、その環境の GKE クラスタノードを実行するサービス アカウントを指定します。リクエストされたオペレーションを実行するための十分な権限がアカウントにない場合、Cloud Composer はエラーを出力します。

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

解決策: アクセス制御での説明に沿って、アカウントと環境のサービス アカウントの両方にロールを割り当てます。

Airflow データベースのサイズが大きすぎて操作を実行できません

Airflow データベースのサイズが大きすぎてアップグレード操作を正常に処理できないため、Cloud Composer のアップグレード オペレーションが正常に実行されない場合があります。

Airflow データベースのサイズが 16 GB を超えると、Cloud Composer は次のエラーを出力します。

Airflow database uses more than 16 GB. Please clean the database before upgrading.

解決策: Airflow データベースのメンテナンスの説明に従って、Airflow データベース クリーンアップを実行します。

PyPI パッケージの競合が原因で新しい Cloud Composer バージョンへのアップグレードが失敗する

インストールされたカスタム PyPI パッケージを使用して環境をアップグレードすると、PyPI パッケージの競合に関連するエラーが発生することがあります。これは、新しい Cloud Composer イメージに新しいバージョンのプリインストール済みパッケージが含まれているため、環境にインストールした PyPI パッケージと依存関係の競合が発生するためです。

解決策:

  • パッケージの競合に関する詳細情報を取得するには、アップグレード チェックを実行します。
  • インストール済みのカスタム PyPI パッケージのバージョン制約を緩和します。たとえば、バージョンを ==1.0.1 ではなく >=1.0.1 として指定します。
  • 競合する依存関係を解決するためにバージョン要件を変更する方法について詳しくは、pip のドキュメントをご覧ください。

DNS に接続されていないと、アップグレードや更新中に問題が発生する可能性がある

このような接続の問題により、次のようなログエントリが発生する可能性があります。

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

通常は DNS へのルートがないことを意味しているため、cluster.google.internal DNS 名をクラスタ、Pod、サービス ネットワーク内から IP アドレスに解決できるようにします。環境が作成されている VPC 内(ホストまたはサービス プロジェクト)で、限定公開の Google アクセスが有効になっているかどうかを確認してください。

詳細情報:

triggerer CPU が 1 vCPU の上限を超えている

Cloud Composer 2 バージョン 2.4.4 以降では、パフォーマンスのスケーリングを改善する別の triggerer リソース割り当て戦略が導入されています。環境の更新の実行中に triggerer CPU に関連するエラーが発生した場合は、triggerer 1 つにつき複数の vCPU を使用するように現在の triggerer が構成されているということです。

解決策:

失敗した移行の警告を調べる

Airflow を新しいバージョンにアップグレードすると、新しい制約が Airflow データベースに適用されることがあります。これらの制約を適用できない場合は、Airflow は制約を適用できなかった行を格納する新しいテーブルを作成します。移動したデータテーブルの名前が変更されるか、削除されるまで、Airflow UI には警告メッセージが表示されます。

解決策:

次の 2 つの DAG を使用すると、移動したデータを検査し、テーブルの名前を変更できます。

list_moved_tables_after_upgrade_dag DAG には、制約を適用できなかったすべてのテーブルから移動された行が一覧表示されます。データを検査し、保持するかどうかを決定します。保持するには、Airflow データベース内のデータを手動で修正する必要があります。たとえば、正しいデータで行を再度追加します。

データが不要な場合や、すでに修正されている場合は、rename_moved_tables_after_upgrade_dag DAG を実行できます。この DAG は、移動されたテーブルの名前を変更します。テーブルとそのデータは削除されません。後でデータを確認できます。

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX

def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )

def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())

def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")

with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

次のステップ