UPDATE operation on this environment failed 3 minutes ago with the
following error message:
Composer Backend timed out. Currently running tasks are [stage:
CP_COMPOSER_AGENT_RUNNING
description: "No agent response published."
response_timestamp {
seconds: 1618203503
nanos: 291000000
}
].
list_moved_tables_after_upgrade_dag DAG 會列出無法套用限制的每個資料表,以及從這些資料表移出的資料列。檢查資料並決定是否要保留。如要保留資料,請手動修正 Airflow 資料庫中的資料。例如,重新加入含有正確資料的資料列。
如果不需要這些資料,或已修正問題,可以執行 rename_moved_tables_after_upgrade_dag DAG。這個 DAG 會重新命名已移動的資料表。系統不會刪除表格及其資料,因此您稍後可以查看資料。
"""When upgrading Airflow to a newer version,it might happen that some data cannot be migrated,often because of constraint changes in the metadata base.This file contains 2 DAGs:1. 'list_moved_tables_after_upgrade_dag' Prints the rows which failed to be migrated.2. 'rename_moved_tables_after_upgrade_dag' Renames the table which contains the failed migrations. This will remove the warning message from airflow."""importdatetimeimportloggingfromairflowimportDAGfromairflow.operators.pythonimportPythonOperatorfromairflow.providers.postgres.hooks.postgresimportPostgresHookfromairflow.settingsimportAIRFLOW_MOVED_TABLE_PREFIXdefget_moved_tables():hook=PostgresHook(postgres_conn_id="airflow_db")returnhook.get_records("SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'")deflist_moved_records():tables=get_moved_tables()ifnottables:logging.info("No moved tables found")returnhook=PostgresHook(postgres_conn_id="airflow_db")forschema,tableintables:df=hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")logging.info(df.to_markdown())defrename_moved_tables():tables=get_moved_tables()ifnottables:returnhook=PostgresHook(postgres_conn_id="airflow_db")forschema,tableintables:hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")withDAG(dag_id="list_moved_tables_after_upgrade_dag",start_date=datetime.datetime(2023,1,1),schedule_interval=None,catchup=False,):t1=PythonOperator(task_id="list_moved_records",python_callable=list_moved_records)withDAG(dag_id="rename_moved_tables_after_upgrade_dag",start_date=datetime.datetime(2023,1,1),schedule_interval=None,catchup=False,)asdag:t1=PythonOperator(task_id="rename_moved_tables",python_callable=rename_moved_tables)
[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["上次更新時間:2025-08-26 (世界標準時間)。"],[[["\u003cp\u003eThis page addresses common issues encountered during Cloud Composer environment updates or upgrades, including service account permissions, PyPI dependencies, and Airflow database size.\u003c/p\u003e\n"],["\u003cp\u003eInsufficient permissions, either for your account or the environment's service account, can halt updates or upgrades, requiring the assignment of roles as outlined in the Access Control documentation.\u003c/p\u003e\n"],["\u003cp\u003eUpgrades might fail if the Airflow database exceeds 20 GB, necessitating a database cleanup as described in the Clean Up the Airflow Database documentation.\u003c/p\u003e\n"],["\u003cp\u003eConflicts with custom PyPI packages during upgrades can occur due to version mismatches, which can be resolved by loosening version constraints or using the upgrade check tool.\u003c/p\u003e\n"],["\u003cp\u003eAirflow upgrades may result in data migration failures; two provided DAGs allow inspecting and renaming the tables containing this unmigrated data, addressing warning messages in the Airflow UI.\u003c/p\u003e\n"]]],[],null,["\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\n**Cloud Composer 3** \\| [Cloud Composer 2](/composer/docs/composer-2/troubleshooting-updates-upgrades \"View this page for Cloud Composer 2\") \\| [Cloud Composer 1](/composer/docs/composer-1/troubleshooting-updates-upgrades \"View this page for Cloud Composer 1\")\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\nThis page provides troubleshooting information for problems that you might\nencounter while updating or upgrading Cloud Composer environments.\n\nFor troubleshooting information related to creating environments, see\n[Troubleshooting environment creation](/composer/docs/composer-3/troubleshooting-environment-creation).\n\nWhen Cloud Composer environments are updated, the majority of issues\nhappen because of the following reasons:\n\n- Service account permission problems\n- PyPI dependency issues\n- Size of the Airflow database\n\nInsufficient permissions to update or upgrade an environment\n\nIf Cloud Composer can't update or upgrade an environment because of\ninsufficient permissions, it outputs the following error message: \n\n ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission\n\n**Solution** : Assign roles to both to your account and to the service account\nof your environment as described in [Access control](/composer/docs/composer-3/access-control).\n\nThe service account of the environment has insufficient permissions\n\nWhen creating a Cloud Composer environment, you specify a service\naccount that performs most of the environment's operations. If this\nservice account doesn't have enough permissions for the requested operation,\nthen Cloud Composer outputs an error: \n\n UPDATE operation on this environment failed 3 minutes ago with the\n following error message:\n Composer Backend timed out. Currently running tasks are [stage:\n CP_COMPOSER_AGENT_RUNNING\n description: \"No agent response published.\"\n response_timestamp {\n seconds: 1618203503\n nanos: 291000000\n }\n ].\n\n**Solution** : Assign roles to your Google Account and to the service account of\nyour environment as described in [Access control](/composer/docs/composer-3/access-control).\n\nThe size of the Airflow database is too big to perform the operation\n\nAn upgrade operation might not succeed because the size of the Airflow database\nis too large for upgrade operations to succeed.\n\nIf the size of the Airflow database is more than\n20 GB,\nCloud Composer outputs the following error: \n\n Airflow database uses more than 20 GB. Please clean the database before upgrading.\n\n**Solution** : Perform the Airflow database cleanup, as described in\n[Clean up the Airflow database](/composer/docs/composer-3/cleanup-airflow-database).\n\nAn upgrade to a new Cloud Composer version fails because of PyPI package conflicts\n\nWhen you upgrade an environment with\n[installed custom PyPI packages](/composer/docs/composer-3/install-python-dependencies), you might encounter\nerrors related to PyPI package conflicts. This might happen because the new\n\nAirflow build\n\ncontains later versions of preinstalled packages. This can cause dependency\nconflicts with PyPI packages that you installed in your environment.\n\n**Solution**:\n\n- To get detailed information about package conflicts, run an [upgrade check](/composer/docs/composer-3/upgrade-environments#upgrade-check).\n- Loosen version constraints for installed custom PyPI packages. For example, instead of specifying a version as `==1.0.1`, specify it as `\u003e=1.0.1`.\n- For more information about changing version requirements to resolve conflicting dependencies, see [pip documentation](https://pip.pypa.io/en/stable/topics/dependency-resolution).\n\nInspect failed migration warnings\n\nWhen upgrading Airflow to a later version, sometimes new constraints are\napplied to the Airflow database. If these constraints can't be applied,\nAirflow creates new tables to store the rows for which the constraints couldn't\nbe applied. Airflow UI displays a warning message until the moved data tables\nare renamed or dropped.\n\n**Solution**:\n\nYou can use the following two DAGs to inspect the moved data and rename the\ntables.\n\nThe `list_moved_tables_after_upgrade_dag` DAG lists rows that were moved from\nevery table where constraints could not be applied. Inspect the data and decide\nwhether you want to keep it. To keep it, you need to manually fix the data in\nthe Airflow database. For example, by adding the rows back with the correct data.\n\nIf you don't need the data or if you already fixed it, then you can run the\n`rename_moved_tables_after_upgrade_dag` DAG. This DAG renames the moved tables.\nThe tables and their data are not deleted, so you can review the data at a\nlater point. \n\n \"\"\"\n When upgrading Airflow to a newer version,\n it might happen that some data cannot be migrated,\n often because of constraint changes in the metadata base.\n This file contains 2 DAGs:\n\n 1. 'list_moved_tables_after_upgrade_dag'\n Prints the rows which failed to be migrated.\n 2. 'rename_moved_tables_after_upgrade_dag'\n Renames the table which contains the failed migrations. This will remove the\n warning message from airflow.\n \"\"\"\n\n import datetime\n import logging\n\n from airflow import DAG\n from airflow.operators.python import PythonOperator\n from airflow.providers.postgres.hooks.postgres import PostgresHook\n from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX\n\n\n def get_moved_tables():\n hook = PostgresHook(postgres_conn_id=\"airflow_db\")\n return hook.get_records(\n \"SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename\"\n f\" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'\"\n )\n\n\n def list_moved_records():\n tables = get_moved_tables()\n if not tables:\n logging.info(\"No moved tables found\")\n return\n\n hook = PostgresHook(postgres_conn_id=\"airflow_db\")\n for schema, table in tables:\n df = hook.get_pandas_df(f\"SELECT * FROM {schema}.{table}\")\n logging.info(df.to_markdown())\n\n\n def rename_moved_tables():\n tables = get_moved_tables()\n if not tables:\n return\n\n hook = PostgresHook(postgres_conn_id=\"airflow_db\")\n for schema, table in tables:\n hook.run(f\"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}\")\n\n\n with DAG(\n dag_id=\"list_moved_tables_after_upgrade_dag\",\n start_date=datetime.datetime(2023, 1, 1),\n schedule_interval=None,\n catchup=False,\n ):\n t1 = PythonOperator(\n task_id=\"list_moved_records\", python_callable=list_moved_records\n )\n\n with DAG(\n dag_id=\"rename_moved_tables_after_upgrade_dag\",\n start_date=datetime.datetime(2023, 1, 1),\n schedule_interval=None,\n catchup=False,\n ) as dag:\n t1 = PythonOperator(\n task_id=\"rename_moved_tables\", python_callable=rename_moved_tables\n )\n\nWhat's next\n\n- [Updating environments](/composer/docs/composer-3/update-environments)\n- [Upgrading environments](/composer/docs/composer-3/upgrade-environments)\n- [Troubleshooting environment creation](/composer/docs/composer-3/troubleshooting-environment-creation)"]]