importdatetimeimportosimportairflowfromairflow.providers.common.sql.operators.sqlimportSQLExecuteQueryOperatorSQL_DATABASE=os.environ["SQL_DATABASE"]withairflow.DAG("airflow_db_connection_example",start_date=datetime.datetime(2025,1,1),schedule_interval=None,catchup=False)asdag:SQLExecuteQueryOperator(task_id="run_airflow_db_query",dag=dag,conn_id="airflow_db",database=SQL_DATABASE,sql="SELECT * FROM dag LIMIT 10;",)
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-09-01。"],[[["\u003cp\u003eThis page details how to connect to and run SQL queries on the Airflow database of your Cloud Composer 1 environment.\u003c/p\u003e\n"],["\u003cp\u003eDirectly accessing the Airflow database is discouraged; the Airflow REST API or Airflow CLI commands are the recommended alternatives.\u003c/p\u003e\n"],["\u003cp\u003eConnecting to the Airflow database involves creating and uploading a DAG that utilizes the \u003ccode\u003ePostgresOperator\u003c/code\u003e to specify and run the SQL query.\u003c/p\u003e\n"],["\u003cp\u003eAvoid adding custom tables or modifying the schema of the existing Airflow database to prevent complications.\u003c/p\u003e\n"],["\u003cp\u003eBacking up the environment's data should be done with snapshots instead of dumping the database.\u003c/p\u003e\n"]]],[],null,["\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\n[Cloud Composer 3](/composer/docs/composer-3/access-airflow-database \"View this page for Cloud Composer 3\") \\| [Cloud Composer 2](/composer/docs/composer-2/access-airflow-database \"View this page for Cloud Composer 2\") \\| **Cloud Composer 1**\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\nThis page explains how to connect to a Cloud SQL instance that runs\nthe [Airflow database](/composer/docs/composer-1/environment-architecture#airflow-database) of your Cloud Composer\nenvironment and run SQL queries.\n\nFor example, you might want to run queries directly on the Airflow database,\nmake database backups, gather statistics based on the database content, or\nretrieve any other custom information from the database.\n| **Important:** We recommend to avoid directly accessing the Airflow database, if it is possible to use other approaches such as [Airflow REST API](/composer/docs/composer-1/access-airflow-api) or [Airflow CLI commands](/composer/docs/composer-1/access-airflow-cli) instead.\n\nBefore you begin **Warning:** Don't add your own custom tables to the Airflow database and don't change the schema of the Airflow database. Don't add users or databases to the Cloud SQL instance that hosts the Airflow database.\n\nRun a SQL query on the Airflow database\n\nTo connect to the Airflow database:\n\n1. Create a DAG with one or more SQLExecuteQueryOperator operators. To get\n started, you can use the example DAG.\n\n | **Caution:** Your **SQL query might run more than once** because of the DAG schedule and catchup. If you want to run the SQL query only once, set `schedule_interval` to `None`, `catchup` to `False`, and then [trigger the DAG manually](/composer/docs/composer-1/schedule-and-trigger-dags#manually).\n2. In the `sql` parameter of the operator, specify your SQL query.\n\n3. [Upload](/composer/docs/composer-1/manage-dags#add) this DAG to your environment.\n\n4. Trigger the DAG, for example, you can do it\n [manually](/composer/docs/composer-1/schedule-and-trigger-dags#manually) or wait until it runs on a schedule.\n\nExample DAG: \n\n import datetime\n import os\n\n import airflow\n from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator\n\n SQL_DATABASE = os.environ[\"SQL_DATABASE\"]\n\n with airflow.DAG(\n \"airflow_db_connection_example\",\n start_date=datetime.datetime(2025, 1, 1),\n schedule_interval=None,\n catchup=False) as dag:\n\n SQLExecuteQueryOperator(\n task_id=\"run_airflow_db_query\",\n dag=dag,\n conn_id=\"airflow_db\",\n database=SQL_DATABASE,\n sql=\"SELECT * FROM dag LIMIT 10;\",\n )\n\nFor more information about using the SQLExecuteQueryOperator, see the\n[How-to Guide for Postgres using SQLExecuteQueryOperator](https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators.html)\nin the Airflow documentation.\n\nDump database contents and transfer them to a bucket **Deprecated:** This approach is deprecated. Instead, use [snapshots](/composer/docs/composer-1/save-load-snapshots) to back up the environment's data, including the Airflow database contents.\n\nWhat's next\n\n- [Access Airflow REST API](/composer/docs/composer-1/access-airflow-api)\n- [Run Airflow CLI commands](/composer/docs/composer-1/access-airflow-cli)"]]