Cloud Composer 2 で Apache Airflow DAG を実行する(Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このクイックスタート ガイドでは、Cloud Composer 環境を作成し、Apache Airflow DAG を Cloud Composer 2 で実行する方法について説明します。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Install the Google Cloud CLI.
  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

    ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

    必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

環境のサービス アカウントを作成する

環境を作成するときに、サービス アカウントを指定します。このサービス アカウントは、環境のサービス アカウントと呼ばれます。環境では、このサービス アカウントを使用してほとんどのオペレーションを実行します。

ご使用の環境のサービス アカウントはユーザー アカウントではありません。サービス アカウントは、ユーザーではなく、アプリケーションや仮想マシン(VM)インスタンスで使用される特別なアカウントです。

環境のサービス アカウントを作成するには:

  1. Identity and Access Management のドキュメントの説明に沿って、新しいサービス アカウントを作成します

  2. Identity and Access Management のドキュメントに記載されているように、ロールを付与します。必要なロールは Composer ワーカーcomposer.worker)です。

環境の作成

これがプロジェクトの最初の環境の場合は、Cloud Composer サービス エージェント アカウントを新しいプリンシパルとして環境のサービス アカウントに追加して roles/composer.ServiceAgentV2Ext ロールを付与します。

デフォルトでは、環境はデフォルトの Compute Engine サービス アカウントを使用します。また、次の例では、この権限を追加する方法を示します。

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

最新の Cloud Composer 2 バージョンで、us-central1 リージョンに example-environment という名前の新しい環境を作成します。

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.10.0-airflow-2.10.2

DAG ファイルを作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

このガイドでは、quickstart.py ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。

  1. DAG(composer_sample_dag)を作成します。この DAG は毎日実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

ローカルマシンに quickstart.py ファイルのコピーを保存します。

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG ファイルを環境のバケットにアップロードする

すべての Cloud Composer 環境には、Cloud Storage バケットが関連付けられています。Cloud Composer の Airflow は、このバケットの /dags フォルダにある DAG のみをスケジュール設定します。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから環境の /dags フォルダにアップロードします。

Google Cloud CLI で quickstart.py をアップロードするには、quickstart.py ファイルがあるフォルダで次のコマンドを実行します。

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

DAG を表示する

DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。

  1. アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
  2. DAG を使用可能な DAG のリストに追加します。
  3. DAG ファイルで指定したスケジュールに沿って DAG を実行します。

DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、Google Cloud コンソールで DAG 情報を表示するための Cloud Composer インターフェースです。Cloud Composer は、ネイティブの Airflow ウェブ インターフェースである Airflow UI にもアクセスできます。

  1. 以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。

  2. Google Cloud CLI で次のコマンドを実行します。このコマンドは、環境内の DAG を一覧表示する dags list Airflow CLI コマンドを実行します。

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. コマンドの出力に composer_quickstart DAG のリストが含まれていることを確認します。

    出力例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

DAG 実行の詳細を表示する

DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。

サンプル DAG には、コンソールで echo コマンドを実行する 1 つのタスク print_dag_run_conf が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、composer_quickstart DAG の DAG 実行を一覧表示します。

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

出力例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI には、タスクログを表示するコマンドはありません。Airflow タスクログを表示するには、Cloud Composer DAG UI、Airflow UI、Cloud Logging などの他の方法を使用できます。このガイドでは、特定の DAG 実行のログを Cloud Logging でクエリする方法について説明します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、Cloud Logging から composer_quickstart DAG の特定の DAG 実行に関するログを読み取ります。--format 引数は、ログメッセージのテキストのみが表示されるように出力を整えます。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

以下のように置き換えます。

  • RUN_ID は、前に実行した tasks states-for-dag-run コマンドの出力の run_id 値に置き換えます。例: 2024-02-17T15:38:38.969307+00:00

出力例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。

このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

    1. Google Cloud コンソールで [環境] ページに移動します。

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。

    1. Google Cloud コンソールで、[ストレージ] > [ブラウザ] ページに移動します。

      [ストレージ] > [ブラウザ] に移動します。

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

  3. 環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。

    1. Google Cloud コンソールで、[Compute Engine] > [ディスク] に移動します。

      [ディスク] に移動

    2. 環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。

      たとえば、このディスクの名前は pvc-02bc4842-2312-4347-8519-d87bdcd31115 です。Cloud Composer 2 のディスクは常に Balanced persistent disk タイプで、サイズ 2 GB です。

次のステップ