Cloud Composer 1 | Cloud Composer 2
このページでは、Cloud Composer が Cloud Storage の環境に保存するデータについて説明します。
環境を作成するときに、Cloud Composer によって Cloud Storage バケットが作成され、環境に関連付けられます。バケットの名前は、環境のリージョン、名前、およびランダムな ID(us-central1-b1-6efannnn-bucket
など)に基づきます。
Cloud Composer は、ワークフロー(DAG)のソースコードとそれらの依存関係を Cloud Storage 内の特定のフォルダに保存し、Cloud Storage FUSE を使用して Cloud Composer 環境の Airflow コンポーネントにフォルダをマッピングします。
Cloud Storage バケット内のフォルダ
フォルダ | Storage パス | マッピングされたディレクトリ | 説明 |
---|---|---|---|
DAG | gs://bucket-name/dags |
/home/airflow/gcs/dags |
環境の DAG を保存します。このフォルダ内の DAG のみが環境にスケジュールされます。 |
プラグイン | gs://bucket-name/plugins |
/home/airflow/gcs/plugins |
カスタム プラグインを保存します。カスタムのインハウス Airflow 演算子、フック、センサー、インターフェースなどです。 |
データ | gs://bucket-name/data |
/home/airflow/gcs/data |
タスクが生成して使用するデータを保存します。このフォルダは、すべてのワーカーノードにマウントされます。 |
ログ | gs://bucket-name/logs |
タスクの Airflow ログを保存します。ログは Airflow ウェブ インターフェースと Cloud Composer UI の [ログ] タブでも使用できます。 |
容量に関する考慮事項
dags/
、plugins/
、data/
のフォルダのデータは、Airflow のスケジューラとワーカーに同期されます。
Airflow 2 では、
plugins/
フォルダの内容も Airflow ウェブサーバーに同期されます。Airflow 1 では、
dags/
とplugins/
のフォルダの内容は、DAG のシリアル化が無効になっている場合にのみ Airflow ウェブサーバーに同期されます。それ以外の場合、同期は実行されません。
これらのフォルダに格納されるデータが多いほど、Airflow コンポーネントのより多くのローカル ストレージ容量が占有されます。dags/
と plugins/
に保存するデータが多すぎると、オペレーションが中断され、次のような問題が発生する可能性があります。
ワーカーまたはスケジューラのローカル ストレージが不足し、コンポーネントのローカル ディスクの容量不足によりワーカーまたはスケジューラが強制排除される
dags/
フォルダとplugins/
フォルダからワーカー、スケジューラへのファイルの同期に時間がかかるdags/
フォルダとplugins/
フォルダからワーカー、スケジューラにファイルを同期できなくなる。たとえば、2 GB のファイルをdags/
フォルダに保存したものの、コンポーネントのローカル ディスクの空き容量は 1 GB のみだとします。この場合、同期中にコンポーネントのローカル ストレージが不足し、同期を完了できません。
DAG とプラグイン
ワークフローの失敗を回避するために、Python モジュールに DAG またはプラグインが含まれていなくても、DAG、プラグイン、Python モジュールを dags/
フォルダまたは plugins/
フォルダに保存します。
たとえば、py_file
データフロー パイプラインを参照する DataFlowPythonOperator
を使用します。その py_file
には DAG やプラグインは含まれていませんが、dags/
または plugins/
フォルダに保存する必要があります。
データ
data/
フォルダの一部のファイルが特定の Airflow コンポーネントと同期される状況も考えられます。たとえば、以下が行われている間に Cloud Composer が特定のファイルを初めて読み込もうとすると、次のようになります。
DAG の解析 DAG の解析中にファイルが初めて読み取られると、Cloud Composer は DAG を解析するスケジューラとファイルを同期します。
DAG の実行 DAG の実行中にファイルが初めて読み取られると、Cloud Composer はその実行を行っているワーカーとファイルを同期します。
Airflow コンポーネントではローカル ストレージが制限されているため、ダウンロードしたファイルを削除してコンポーネントのディスク容量を解放することを検討してください。同じファイルを 1 つの Airflow ワーカーにダウンロードする同時タスクがある場合、ローカル ストレージの使用量も一時的に増加する場合があります。
Logs
logs/
フォルダは、Cloud Storage API を使用して Airflow ワーカーからバケットに同期されます。
Cloud Storage API の割り当ては移動データ量に基づいて計算されるため、システムが実行する Airflow タスクの数によって Cloud Storage API の使用量が増加することがあります。実行するタスクが多いほど、ログファイルが大きくなります。
ウェブサーバー、dags/
、plugins/
、data/
フォルダ
Airflow 2 は、すぐに使用できる DAG のシリアル化を使用します。
plugins/
フォルダは、ウェブサーバーに自動的に同期されるため、Airflow UI でプラグインを読み込むことができます。Airflow 2 で DAG のシリアル化を無効にすることはできません。Airflow 1 では DAG のシリアル化がサポートされており、Cloud Composer でデフォルトで有効になっています。
- DAG のシリアル化が有効になっている場合、
dags/
フォルダとplugins/
フォルダのファイルはウェブサーバーに同期されません。 - DAG のシリアル化を無効にした場合、
dags/
とplugins/
のファイルがウェブサーバーに同期されます。
- DAG のシリアル化が有効になっている場合、
データの同期
Cloud Storage バケットの DAG やプラグインを変更すると、クラスタ内のすべてのノードのデータが同期されます。
Cloud Composer による dags/
フォルダと plugins/
フォルダの同期は、ローカルにコピーすることによって一方向に行われます。一方向に同期されるため、これらのフォルダのローカルの変更は上書きされます。
data/
フォルダと logs/
フォルダの同期は、Cloud Storage FUSE を使用して双方向に行われます。
Cloud Composer 1 と Cloud Composer 2 の違い
Cloud Composer 1 と Cloud Composer 2 では、データ同期を実行するコンポーネントにそれぞれ異なる量の CPU とメモリが割り当てられます。
構成によっては、Cloud Storage バケットと Airflow コンポーネント間の dags/
、plugins/
、data/
フォルダの内容の同期速度が、Cloud Compose 1 と Cloud Composer 2 で異なる場合があります。