Cloud Composer 2 で Apache Airflow DAG を実行する(Google Cloud CLI)
Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このクイックスタート ガイドでは、Cloud Composer 環境を作成し、Apache Airflow DAG を Cloud Composer 2 で実行する方法について説明します。
Airflow を初めてご利用の場合は、Airflow のコンセプト、オブジェクト、使用状況の詳細について、Apache Airflow ドキュメントの Airflow のコンセプト チュートリアルをご覧ください。
代わりに Google Cloud コンソールを使用する場合は、Cloud Composer で Apache Airflow DAG を実行するをご覧ください。
Terraform を使用して環境を作成する場合は、環境を作成する(Terraform)をご覧ください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。
-
Cloud Composer 環境を表示、作成および管理するには:
-
環境とストレージ オブジェクトの管理者(
roles/composer.environmentAndStorageObjectAdmin
) -
サービス アカウント ユーザー(
roles/iam.serviceAccountUser
)
-
環境とストレージ オブジェクトの管理者(
-
ログを表示するには: ログビューア(
roles/logging.viewer
)
ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。
-
Cloud Composer 環境を表示、作成および管理するには:
環境の作成
これがプロジェクトの最初の環境の場合は、Cloud Composer サービス エージェント アカウントを新しいプリンシパルとして環境のサービス アカウントに追加して roles/composer.ServiceAgentV2Ext
ロールを付与します。
デフォルトでは、環境はデフォルトの Compute Engine サービス アカウントを使用します。また、次の例では、この権限を追加する方法を示します。
# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
--filter="$(gcloud config get-value project)" \
--format="value(PROJECT_NUMBER)" \
--limit=1)
# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
$PROJECT_NUMBER-compute@developer.gserviceaccount.com \
--member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
--role roles/composer.ServiceAgentV2Ext
最新の Cloud Composer 2 バージョンで、us-central1
リージョンに example-environment
という名前の新しい環境を作成します。
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-2.9.7-airflow-2.9.3
DAG ファイルを作成する
Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。
このガイドでは、quickstart.py
ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。
- DAG(
composer_sample_dag
)を作成します。この DAG は毎日実行されます。 - タスク(
print_dag_run_conf
)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。
ローカルマシンに quickstart.py
ファイルのコピーを保存します。
DAG ファイルを環境のバケットにアップロードする
すべての Cloud Composer 環境には、Cloud Storage バケットが関連付けられています。Cloud Composer の Airflow は、このバケットの /dags
フォルダにある DAG のみをスケジュール設定します。
DAG のスケジュールを設定するには、quickstart.py
をローカルマシンから環境の /dags
フォルダにアップロードします。
Google Cloud CLI で quickstart.py
をアップロードするには、quickstart.py
ファイルがあるフォルダで次のコマンドを実行します。
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
DAG を表示する
DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。
- アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
- DAG を使用可能な DAG のリストに追加します。
- DAG ファイルで指定したスケジュールに沿って DAG を実行します。
DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、Google Cloud コンソールで DAG 情報を表示するための Cloud Composer インターフェースです。Cloud Composer は、ネイティブの Airflow ウェブ インターフェースである Airflow UI にもアクセスできます。
以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、環境内の DAG を一覧表示する
dags list
Airflow CLI コマンドを実行します。gcloud composer environments run example-environment \ --location us-central1 \ dags list
コマンドの出力に
composer_quickstart
DAG が表示されていることを確認します。出力例:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
DAG 実行の詳細を表示する
DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。
サンプル DAG には、コンソールで echo
コマンドを実行する 1 つのタスク print_dag_run_conf
が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、composer_quickstart
DAG の DAG 実行を一覧表示します。
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
出力例:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Airflow CLI には、タスクログを表示するコマンドはありません。他の方法(Cloud Composer DAG UI、Airflow UI、Cloud Logging)を使用して Airflow タスクログを表示することもできます。このガイドでは、特定の DAG 実行のログを Cloud Logging でクエリする方法を説明します。
Google Cloud CLI で次のコマンドを実行します。このコマンドは、Cloud Logging から composer_quickstart
DAG の特定の DAG 実行に関するログを読み取ります。--format
引数は、ログメッセージのテキストのみが表示されるように出力を整えます。
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
以下のように置き換えます。
RUN_ID
は、前に実行したtasks states-for-dag-run
コマンドの出力のrun_id
値に置き換えます。例:2024-02-17T15:38:38.969307+00:00
出力例:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。
このチュートリアルで使用したリソースを削除します。
Cloud Composer 環境を削除します。
Google Cloud Console で [環境] ページに移動します。
[
example-environment
] を選択し、[削除] をクリックします。環境が削除されるまで待ちます。
環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。
Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。
環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を
us-central1-example-environ-c1616fe8-bucket
にします。
環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。
Google Cloud Console で、[Compute Engine] > [ディスク] に移動します。
環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。
たとえば、このディスクの名前は
pvc-02bc4842-2312-4347-8519-d87bdcd31115
です。Cloud Composer 2 のディスクは常にBalanced persistent disk
タイプで、サイズ 2 GB です。
次のステップ