importdatetimeimportosimportairflowfromairflow.providers.common.sql.operators.sqlimportSQLExecuteQueryOperatorSQL_DATABASE=os.environ["SQL_DATABASE"]withairflow.DAG("airflow_db_connection_example",start_date=datetime.datetime(2025,1,1),schedule_interval=None,catchup=False)asdag:SQLExecuteQueryOperator(task_id="run_airflow_db_query",dag=dag,conn_id="airflow_db",database=SQL_DATABASE,sql="SELECT * FROM dag LIMIT 10;",)
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-08-29。"],[[["\u003cp\u003eThis page outlines how to connect to and query the Cloud SQL instance that hosts the Airflow database for Cloud Composer environments.\u003c/p\u003e\n"],["\u003cp\u003eWhile direct access to the Airflow database is possible, it is generally recommended to utilize the Airflow REST API or CLI commands instead.\u003c/p\u003e\n"],["\u003cp\u003eYou can execute SQL queries on the Airflow database by creating a DAG with \u003ccode\u003ePostgresOperator\u003c/code\u003e operators and specifying your SQL query in the \u003ccode\u003esql\u003c/code\u003e parameter, while setting schedule intervals accordingly to prevent multiple runs.\u003c/p\u003e\n"],["\u003cp\u003eDirectly adding custom tables or modifying the schema of the Airflow database is strictly prohibited.\u003c/p\u003e\n"],["\u003cp\u003eBacking up the Airflow database contents should be done using snapshots rather than dumping database contents to a bucket.\u003c/p\u003e\n"]]],[],null,["# Access the Airflow database\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\n[Cloud Composer 3](/composer/docs/composer-3/access-airflow-database \"View this page for Cloud Composer 3\") \\| **Cloud Composer 2** \\| [Cloud Composer 1](/composer/docs/composer-1/access-airflow-database \"View this page for Cloud Composer 1\")\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\nThis page explains how to connect to a Cloud SQL instance that runs\nthe [Airflow database](/composer/docs/composer-2/environment-architecture#airflow-database) of your Cloud Composer\nenvironment and run SQL queries.\n\nFor example, you might want to run queries directly on the Airflow database,\nmake database backups, gather statistics based on the database content, or\nretrieve any other custom information from the database.\n| **Important:** We recommend to avoid directly accessing the Airflow database, if it is possible to use other approaches such as [Airflow REST API](/composer/docs/composer-2/access-airflow-api) or [Airflow CLI commands](/composer/docs/composer-2/access-airflow-cli) instead.\n\nBefore you begin\n----------------\n\n| **Warning:** Don't add your own custom tables to the Airflow database and don't change the schema of the Airflow database. Don't add users or databases to the Cloud SQL instance that hosts the Airflow database.\n\nRun a SQL query on the Airflow database\n---------------------------------------\n\nTo connect to the Airflow database:\n\n1. Create a DAG with one or more SQLExecuteQueryOperator operators. To get\n started, you can use the example DAG.\n\n | **Caution:** Your **SQL query might run more than once** because of the DAG schedule and catchup. If you want to run the SQL query only once, set `schedule_interval` to `None`, `catchup` to `False`, and then [trigger the DAG manually](/composer/docs/composer-2/schedule-and-trigger-dags#manually).\n2. In the `sql` parameter of the operator, specify your SQL query.\n\n3. [Upload](/composer/docs/composer-2/manage-dags#add) this DAG to your environment.\n\n4. Trigger the DAG, for example, you can do it\n [manually](/composer/docs/composer-2/schedule-and-trigger-dags#manually) or wait until it runs on a schedule.\n\nExample DAG: \n\n import datetime\n import os\n\n import airflow\n from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator\n\n SQL_DATABASE = os.environ[\"SQL_DATABASE\"]\n\n with airflow.DAG(\n \"airflow_db_connection_example\",\n start_date=datetime.datetime(2025, 1, 1),\n schedule_interval=None,\n catchup=False) as dag:\n\n SQLExecuteQueryOperator(\n task_id=\"run_airflow_db_query\",\n dag=dag,\n conn_id=\"airflow_db\",\n database=SQL_DATABASE,\n sql=\"SELECT * FROM dag LIMIT 10;\",\n )\n\nFor more information about using the SQLExecuteQueryOperator, see the\n[How-to Guide for Postgres using SQLExecuteQueryOperator](https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators.html)\nin the Airflow documentation.\n\nDump database contents and transfer them to a bucket\n----------------------------------------------------\n\n| **Deprecated:** This approach is deprecated. Instead, use [snapshots](/composer/docs/composer-2/save-load-snapshots) to back up the environment's data, including the Airflow database contents.\n\nWhat's next\n-----------\n\n- [Access Airflow REST API](/composer/docs/composer-2/access-airflow-api)\n- [Run Airflow CLI commands](/composer/docs/composer-2/access-airflow-cli)"]]