Cloud Composer 3 | Cloud Composer 2 | Cloud Composer�
本頁說明如何管理環境中的 Airflow 連線,以及如何從 DAG 存取這些連線。
關於 Airflow 連線
Aiflow 連線會儲存憑證和其他連線資訊,例如使用者名稱、連線字串和密碼。DAG 會使用連線與 Google Cloud 和其他服務中的資源通訊及存取資源。
DAG 中的 Airflow 運算子會使用運算子的預設連線,或是您指定的自訂連線名稱。
關於連線安全性
大多數 Airflow 運算子不會直接接受憑證。而是使用 Airflow 連線。
建立新環境時,Cloud Composer 會為該環境產生專屬的永久 Fernet 金鑰,並預設保護連線額外資訊。您可以在 Airflow UI 的「Configuration」(設定) 頁面中查看 fernet_key
。
如要進一步瞭解 Airflow 如何保護連線和密碼,請參閱 Airflow 說明文件中的「保護連線」和「遮蓋機密資料」一節。
關於連線類型
Airflow 會使用不同類型的連線連至特定服務。 舉例來說,Google Cloud 連線類型會連線至 Google Cloud中的其他服務。以另一個例子來說,S3 連線類型會連線至 Amazon S3 bucket。
如要在 Airflow 中新增連線類型,請安裝含有該連線類型的 PyPI 套件。部分套件已預先安裝在您的環境中。舉例來說,您可以使用 apache-airflow-providers-google
套件的連線,而不必安裝自訂 PyPI 套件。
預先設定的連線
Cloud Composer 會在環境中設定下列預設連線。您可以使用這些連線存取專案中的資源,不必進行設定。
google_cloud_default
bigquery_default
google_cloud_datastore_default
google_cloud_storage_default
在 Secret Manager 中新增連線
您可以將連線儲存在 Secret Manager 中,不必新增至 Airflow。建議您在儲存憑證和其他私密資訊時,採用這種做法。
如要在 Secret Manager 中新增連線,請按照下列步驟操作:
新增密鑰,名稱須符合連線的模式。
例如:
airflow-connections-example_connection
。在 DAG 中,使用不含前置字元的連線名稱:example_connection
。新增連線的參數:
JSON 格式
將連線的 JSON 表示法新增為密鑰的值。例如:
{ "conn_type": "mysql", "host": "example.com", "login": "login", "password": "password", "port": "9000" }
如要進一步瞭解 JSON 連線格式,請參閱 Airflow 說明文件。
URI 格式
將連線的 URI 表示法新增為密碼的值:
密鑰必須儲存連線的 URI 表示形式。例如:
mysql://login:password@example.com:9000
。URI 必須經過網址編碼。舉例來說,如果密碼含有空格符號,就必須進行網址編碼,如下所示:
mysql://login:secret%20password@example.com:9000
。
Airflow 提供便利方法,可產生連線 URI。如要瞭解如何使用 JSON 額外資訊編碼複雜網址,請參閱 Airflow 說明文件。
確認所有連線參數都從 Secret Manager 正確讀取。
在 Airflow 中新增連線
除了將連線儲存在 Secret Manager 中,您也可以將連線儲存在 Airflow 中。
如要在 Airflow 中新增連線,請按照下列步驟操作:
Airflow CLI
使用 Google Cloud CLI 執行 connections add
Airflow CLI 指令。例如:
在 Airflow 2 中:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-type "mysql" \
--conn-host "example.com" \
--conn-port "9000" \
--conn-login "login" \
--conn-password "password" \
example_connection
您也可以使用 --conn-uri
引數:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections add -- \
--conn-uri "mysql://login:password@example.com:9000" \
example_connection
在 Airflow 1 中:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections -- \
--add \
--conn_type "mysql" \
--conn_host "example.com" \
--conn_port "9000" \
--conn_login "login" \
--conn_password "password" \
--conn_id "example_connection"
更改下列內容:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的區域。
Airflow UI
請按照 Airflow 說明文件中的連線建立方式操作。
確認 Airflow 能正確讀取連線
您可以透過 Google Cloud CLI 執行 connections get
Airflow CLI 指令,確認系統是否正確讀取連線。舉例來說,如果您將連線儲存在 Secret Manager 中,可以透過這項功能檢查 Airflow 是否從密碼讀取連線的所有參數。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
connections get \
-- CONNECTION_NAME
取代:
- 將
ENVIRONMENT_NAME
替換為環境的名稱。 LOCATION
改成環境所在的地區。CONNECTION_NAME
替換為連線名稱。如果連線儲存在 Secret Manager 中,請使用不含連線前置字元的連線名稱。舉例來說,請指定example_connection
,而不是airflow-connections-example_connection_json
。
範例:
gcloud composer environments run example-environment \
--location us-central1 \
connections get \
-- example_connection -o json
在 DAG 中使用 Airflow 連線
本節說明如何從 DAG 存取連線。
使用 Secret Manager 連線
使用不含前置字元的連線名稱,舉例來說,如果您的密鑰名為 airflow-connections-aws_s3
,請指定 aws_s3
。
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
如果您將預設連線儲存在 Secret Manager 中,可以省略連線名稱。如要瞭解運算子使用的預設連線名稱,請參閱特定運算子的 Airflow 說明文件。舉例來說,S3ToGCSOperator
Airflow 運算子預設會使用 aws_default
連線。您可以將這個預設連線儲存在名為「airflow-connections-aws_default
」的密碼中。
使用儲存在 Airflow 中的連線
使用 Airflow 中定義的連線名稱:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f4221-bucket/data/from-s3/')
如要為運算子使用預設連線,請省略連線名稱。如要瞭解運算子使用的預設連線名稱,請參閱特定運算子的 Airflow 說明文件。舉例來說,S3ToGCSOperator
Airflow 運算子預設會使用 aws_default
連線。
疑難排解
如果您的環境無法存取 Secret Manager 中儲存的密鑰:
請確認環境中已設定 Secret Manager。
確認 Secret Manager 中的連線名稱與 Airflow 使用的連線相符。舉例來說,如果連線名稱為「
example_connection
」,密鑰名稱就是「airflow-connections-example_connection
」。確認 Airflow 正確讀取連線。