UPDATE operation on this environment failed 3 minutes ago with the
following error message:
Composer Backend timed out. Currently running tasks are [stage:
CP_COMPOSER_AGENT_RUNNING
description: "No agent response published."
response_timestamp {
seconds: 1618203503
nanos: 291000000
}
].
list_moved_tables_after_upgrade_dag DAG 列出了无法应用约束的每个表中移出的行。检查数据,然后决定是否要保留。如需保留该数据,您需要手动修复 Airflow 数据库中的数据。例如,通过添加包含正确数据的新行。
如果您不需要这些数据,或者已经修复了这些数据,则可以运行 rename_moved_tables_after_upgrade_dag DAG。此 DAG 会重命名已移动的表。
系统不会删除表及其数据,因此您可以在日后查看这些数据。
"""When upgrading Airflow to a newer version,it might happen that some data cannot be migrated,often because of constraint changes in the metadata base.This file contains 2 DAGs:1. 'list_moved_tables_after_upgrade_dag' Prints the rows which failed to be migrated.2. 'rename_moved_tables_after_upgrade_dag' Renames the table which contains the failed migrations. This will remove the warning message from airflow."""importdatetimeimportloggingfromairflowimportDAGfromairflow.operators.pythonimportPythonOperatorfromairflow.providers.postgres.hooks.postgresimportPostgresHookfromairflow.settingsimportAIRFLOW_MOVED_TABLE_PREFIXdefget_moved_tables():hook=PostgresHook(postgres_conn_id="airflow_db")returnhook.get_records("SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'")deflist_moved_records():tables=get_moved_tables()ifnottables:logging.info("No moved tables found")returnhook=PostgresHook(postgres_conn_id="airflow_db")forschema,tableintables:df=hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")logging.info(df.to_markdown())defrename_moved_tables():tables=get_moved_tables()ifnottables:returnhook=PostgresHook(postgres_conn_id="airflow_db")forschema,tableintables:hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")withDAG(dag_id="list_moved_tables_after_upgrade_dag",start_date=datetime.datetime(2023,1,1),schedule_interval=None,catchup=False,):t1=PythonOperator(task_id="list_moved_records",python_callable=list_moved_records)withDAG(dag_id="rename_moved_tables_after_upgrade_dag",start_date=datetime.datetime(2023,1,1),schedule_interval=None,catchup=False,)asdag:t1=PythonOperator(task_id="rename_moved_tables",python_callable=rename_moved_tables)
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-08-26。"],[[["\u003cp\u003eThis page addresses common issues encountered during Cloud Composer environment updates or upgrades, including service account permissions, PyPI dependencies, and Airflow database size.\u003c/p\u003e\n"],["\u003cp\u003eInsufficient permissions, either for your account or the environment's service account, can halt updates or upgrades, requiring the assignment of roles as outlined in the Access Control documentation.\u003c/p\u003e\n"],["\u003cp\u003eUpgrades might fail if the Airflow database exceeds 20 GB, necessitating a database cleanup as described in the Clean Up the Airflow Database documentation.\u003c/p\u003e\n"],["\u003cp\u003eConflicts with custom PyPI packages during upgrades can occur due to version mismatches, which can be resolved by loosening version constraints or using the upgrade check tool.\u003c/p\u003e\n"],["\u003cp\u003eAirflow upgrades may result in data migration failures; two provided DAGs allow inspecting and renaming the tables containing this unmigrated data, addressing warning messages in the Airflow UI.\u003c/p\u003e\n"]]],[],null,["# Troubleshooting environment updates and upgrades\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\n**Cloud Composer 3** \\| [Cloud Composer 2](/composer/docs/composer-2/troubleshooting-updates-upgrades \"View this page for Cloud Composer 2\") \\| [Cloud Composer 1](/composer/docs/composer-1/troubleshooting-updates-upgrades \"View this page for Cloud Composer 1\")\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\nThis page provides troubleshooting information for problems that you might\nencounter while updating or upgrading Cloud Composer environments.\n\nFor troubleshooting information related to creating environments, see\n[Troubleshooting environment creation](/composer/docs/composer-3/troubleshooting-environment-creation).\n\nWhen Cloud Composer environments are updated, the majority of issues\nhappen because of the following reasons:\n\n- Service account permission problems\n- PyPI dependency issues\n- Size of the Airflow database\n\nInsufficient permissions to update or upgrade an environment\n------------------------------------------------------------\n\nIf Cloud Composer can't update or upgrade an environment because of\ninsufficient permissions, it outputs the following error message: \n\n ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission\n\n**Solution** : Assign roles to both to your account and to the service account\nof your environment as described in [Access control](/composer/docs/composer-3/access-control).\n\nThe service account of the environment has insufficient permissions\n-------------------------------------------------------------------\n\nWhen creating a Cloud Composer environment, you specify a service\naccount that performs most of the environment's operations. If this\nservice account doesn't have enough permissions for the requested operation,\nthen Cloud Composer outputs an error: \n\n UPDATE operation on this environment failed 3 minutes ago with the\n following error message:\n Composer Backend timed out. Currently running tasks are [stage:\n CP_COMPOSER_AGENT_RUNNING\n description: \"No agent response published.\"\n response_timestamp {\n seconds: 1618203503\n nanos: 291000000\n }\n ].\n\n**Solution** : Assign roles to your Google Account and to the service account of\nyour environment as described in [Access control](/composer/docs/composer-3/access-control).\n\nThe size of the Airflow database is too big to perform the operation\n--------------------------------------------------------------------\n\nAn upgrade operation might not succeed because the size of the Airflow database\nis too large for upgrade operations to succeed.\n\nIf the size of the Airflow database is more than\n20 GB,\nCloud Composer outputs the following error: \n\n Airflow database uses more than 20 GB. Please clean the database before upgrading.\n\n**Solution** : Perform the Airflow database cleanup, as described in\n[Clean up the Airflow database](/composer/docs/composer-3/cleanup-airflow-database).\n\nAn upgrade to a new Cloud Composer version fails because of PyPI package conflicts\n----------------------------------------------------------------------------------\n\nWhen you upgrade an environment with\n[installed custom PyPI packages](/composer/docs/composer-3/install-python-dependencies), you might encounter\nerrors related to PyPI package conflicts. This might happen because the new\n\nAirflow build\n\ncontains later versions of preinstalled packages. This can cause dependency\nconflicts with PyPI packages that you installed in your environment.\n\n**Solution**:\n\n- To get detailed information about package conflicts, run an [upgrade check](/composer/docs/composer-3/upgrade-environments#upgrade-check).\n- Loosen version constraints for installed custom PyPI packages. For example, instead of specifying a version as `==1.0.1`, specify it as `\u003e=1.0.1`.\n- For more information about changing version requirements to resolve conflicting dependencies, see [pip documentation](https://pip.pypa.io/en/stable/topics/dependency-resolution).\n\nInspect failed migration warnings\n---------------------------------\n\nWhen upgrading Airflow to a later version, sometimes new constraints are\napplied to the Airflow database. If these constraints can't be applied,\nAirflow creates new tables to store the rows for which the constraints couldn't\nbe applied. Airflow UI displays a warning message until the moved data tables\nare renamed or dropped.\n\n**Solution**:\n\nYou can use the following two DAGs to inspect the moved data and rename the\ntables.\n\nThe `list_moved_tables_after_upgrade_dag` DAG lists rows that were moved from\nevery table where constraints could not be applied. Inspect the data and decide\nwhether you want to keep it. To keep it, you need to manually fix the data in\nthe Airflow database. For example, by adding the rows back with the correct data.\n\nIf you don't need the data or if you already fixed it, then you can run the\n`rename_moved_tables_after_upgrade_dag` DAG. This DAG renames the moved tables.\nThe tables and their data are not deleted, so you can review the data at a\nlater point. \n\n \"\"\"\n When upgrading Airflow to a newer version,\n it might happen that some data cannot be migrated,\n often because of constraint changes in the metadata base.\n This file contains 2 DAGs:\n\n 1. 'list_moved_tables_after_upgrade_dag'\n Prints the rows which failed to be migrated.\n 2. 'rename_moved_tables_after_upgrade_dag'\n Renames the table which contains the failed migrations. This will remove the\n warning message from airflow.\n \"\"\"\n\n import datetime\n import logging\n\n from airflow import DAG\n from airflow.operators.python import PythonOperator\n from airflow.providers.postgres.hooks.postgres import PostgresHook\n from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX\n\n\n def get_moved_tables():\n hook = PostgresHook(postgres_conn_id=\"airflow_db\")\n return hook.get_records(\n \"SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename\"\n f\" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'\"\n )\n\n\n def list_moved_records():\n tables = get_moved_tables()\n if not tables:\n logging.info(\"No moved tables found\")\n return\n\n hook = PostgresHook(postgres_conn_id=\"airflow_db\")\n for schema, table in tables:\n df = hook.get_pandas_df(f\"SELECT * FROM {schema}.{table}\")\n logging.info(df.to_markdown())\n\n\n def rename_moved_tables():\n tables = get_moved_tables()\n if not tables:\n return\n\n hook = PostgresHook(postgres_conn_id=\"airflow_db\")\n for schema, table in tables:\n hook.run(f\"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}\")\n\n\n with DAG(\n dag_id=\"list_moved_tables_after_upgrade_dag\",\n start_date=datetime.datetime(2023, 1, 1),\n schedule_interval=None,\n catchup=False,\n ):\n t1 = PythonOperator(\n task_id=\"list_moved_records\", python_callable=list_moved_records\n )\n\n with DAG(\n dag_id=\"rename_moved_tables_after_upgrade_dag\",\n start_date=datetime.datetime(2023, 1, 1),\n schedule_interval=None,\n catchup=False,\n ) as dag:\n t1 = PythonOperator(\n task_id=\"rename_moved_tables\", python_callable=rename_moved_tables\n )\n\nWhat's next\n-----------\n\n- [Updating environments](/composer/docs/composer-3/update-environments)\n- [Upgrading environments](/composer/docs/composer-3/upgrade-environments)\n- [Troubleshooting environment creation](/composer/docs/composer-3/troubleshooting-environment-creation)"]]