Cloud Composer 2 で Apache Airflow DAG を実行する(Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2

このクイックスタート ガイドでは、Cloud Composer 環境を作成し、Cloud Composer 2 で Apache Airflow DAG を実行する方法について説明します。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Google Cloud CLI をインストールします。
  7. gcloud CLI を初期化するには:

    gcloud init
  8. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  9. Google Cloud プロジェクトで課金が有効になっていることを確認します

  10. Cloud Composer API を有効にします。

    gcloud services enable composer.googleapis.com
  11. このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

    ロールの付与の詳細については、アクセスの管理をご覧ください。

    必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

環境の作成

これがプロジェクトの最初の環境である場合は、環境のサービス アカウントの新しいプリンシパルとして Cloud Composer サービス エージェント アカウントを追加し、roles/composer.ServiceAgentV2Ext ロールを付与します。

デフォルトでは、環境はデフォルトの Compute Engine サービス アカウントを使用します。次の例は、この権限を追加する方法を示しています。

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

最新の Cloud Composer 2 バージョンを使用して、us-central1 リージョンに example-environment という名前の新しい環境を作成します。

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.8.1-airflow-2.7.3

DAG ファイルを作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

このガイドでは、quickstart.py ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。

  1. DAG(composer_sample_dag)を作成します。この DAG は毎日実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

ローカルマシンに quickstart.py ファイルのコピーを保存します。

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG ファイルを環境のバケットにアップロードする

すべての Cloud Composer 環境には、Cloud Storage バケットが関連付けられています。Cloud Composer の Airflow は、このバケットの /dags フォルダにある DAG のみをスケジュール設定します。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから環境の /dags フォルダにアップロードします。

Google Cloud CLI で quickstart.py をアップロードするには、quickstart.py ファイルがあるフォルダで次のコマンドを実行します。

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

DAG を表示する

DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。

  1. アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
  2. DAG を使用可能な DAG のリストに追加します。
  3. DAG ファイルで指定したスケジュールに沿って DAG を実行します。

DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、Google Cloud コンソールで DAG 情報を表示するための Cloud Composer インターフェースです。Cloud Composer は、ネイティブの Airflow ウェブ インターフェースである Airflow UI にもアクセスできます。

  1. 以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。

  2. Google Cloud CLI で次のコマンドを実行します。このコマンドは、環境内の DAG を一覧表示する dags list Airflow CLI コマンドを実行します。

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. コマンドの出力に composer_quickstart DAG が表示されていることを確認します。

    出力例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

DAG 実行の詳細を表示する

DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。

サンプル DAG には、コンソールで echo コマンドを実行する 1 つのタスク print_dag_run_conf が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。

Google Cloud CLI で次のコマンドを実行します。次のコマンドは、composer_quickstart DAG の DAG 実行を一覧表示します。

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

出力例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI には、タスクログを表示するコマンドはありません。他の方法(Cloud Composer DAG UI、Airflow UI、Cloud Logging)で Airflow タスクログを表示することもできます。このガイドでは、特定の DAG 実行から取得したログを Cloud Logging にクエリする方法について説明します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、composer_quickstart DAG の特定の DAG 実行のログを Cloud Logging から読み取ります。--format 引数で、ログメッセージのテキストのみが表示されるように出力をフォーマットします。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

以下のように置き換えます。

  • RUN_ID は、以前に実行した tasks states-for-dag-run コマンドの出力の run_id 値に置き換えます。例: 2024-02-17T15:38:38.969307+00:00

出力例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

クリーンアップ

このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、Google Cloud プロジェクトとそのリソースを削除します。

このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

    1. Google Cloud Console で [環境] ページに移動します。

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。

    1. Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。

      [ストレージ] > [ブラウザ] に移動します。

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

  3. 環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。

    1. Google Cloud Console で、[Compute Engine] > [ディスク] に移動します。

      [ディスク] に移動

    2. 環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。

      たとえば、このディスクの名前は pvc-02bc4842-2312-4347-8519-d87bdcd31115 です。Cloud Composer 2 のディスクは常に Balanced persistent disk タイプで、サイズ 2 GB です。

次のステップ