Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页面介绍如何连接到运行您的 Cloud Composer 环境的 Airflow 数据库的 Cloud SQL 实例以及如何运行 SQL 查询。
例如,您可能想要直接在 Airflow 数据库上运行查询、进行数据库备份、根据数据库内容收集统计信息,或从数据库中检索任何其他自定义信息。
准备工作
对 Airflow 数据库运行 SQL 查询
如需连接到 Airflow 数据库,请执行以下操作:
创建一个包含一个或多个
PostgresOperator
运算符的 DAG。首先,您可以使用示例 DAG。在运算符的
sql
参数中,指定您的 SQL 查询。将此 DAG 上传到您的环境。
触发 DAG,例如,您可以手动触发,也可以等待其按计划运行。
DAG 示例:
import datetime
import os
import airflow
from airflow.providers.postgres.operators.postgres import PostgresOperator
SQL_DATABASE = os.environ["SQL_DATABASE"]
with airflow.DAG(
"airflow_db_connection_example",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval=None,
catchup=False) as dag:
PostgresOperator(
task_id="run_airflow_db_query",
dag=dag,
postgres_conn_id="airflow_db",
database=SQL_DATABASE,
sql="SELECT * FROM dag LIMIT 10;",
)