Cloud Composer 1 で Apache Airflow DAG を実行する(Google Cloud CLI)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

このクイックスタート ガイドでは、Cloud Composer 環境を作成し、Apache Airflow DAG を Cloud Composer 1 で実行する方法について説明します。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  7. このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

    ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

    必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

環境のサービス アカウントを作成する

環境を作成するときに、サービス アカウントを指定します。このサービス アカウントは、環境のサービス アカウントと呼ばれます。環境でこのサービス アカウントを使用して、ほとんどのオペレーションを実行します。

ご使用の環境のサービス アカウントはユーザー アカウントではありません。サービス アカウントは、ユーザーではなく、アプリケーションや仮想マシン(VM)インスタンスで使用される特別なアカウントです。

環境のサービス アカウントを作成するには:

  1. Identity and Access Management のドキュメントの説明に沿って、新しいサービス アカウントを作成します

  2. Identity and Access Management のドキュメントに記載されているとおりに、ロールを付与します。必要なロールは Composer ワーカーcomposer.worker)です。

環境の作成

最新の Cloud Composer 1 バージョンで、us-central1 リージョンに example-environment という名前の新しい環境を作成します。

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

DAG ファイルを作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

このガイドでは、quickstart.py ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。

  1. DAG(composer_sample_dag)を作成します。この DAG は毎日実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

ローカルマシンに quickstart.py ファイルのコピーを保存します。

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG ファイルを環境のバケットにアップロードする

すべての Cloud Composer 環境には、Cloud Storage バケットが関連付けられています。Cloud Composer の Airflow は、このバケットの /dags フォルダにある DAG のみをスケジュール設定します。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから環境の /dags フォルダにアップロードします。

Google Cloud CLI で quickstart.py をアップロードするには、quickstart.py ファイルがあるフォルダで次のコマンドを実行します。

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

DAG を表示する

DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。

  1. アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
  2. DAG を使用可能な DAG のリストに追加します。
  3. DAG ファイルで指定したスケジュールに沿って DAG を実行します。

DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、Google Cloud コンソールで DAG 情報を表示するための Cloud Composer インターフェースです。Cloud Composer は、ネイティブの Airflow ウェブ インターフェースである Airflow UI にもアクセスできます。

  1. 以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。

  2. Google Cloud CLI で次のコマンドを実行します。このコマンドは、環境内の DAG を一覧表示する dags list Airflow CLI コマンドを実行します。

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. コマンドの出力に composer_quickstart DAG のリストが含まれていることを確認します。

    出力例:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

DAG 実行の詳細を表示する

DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。

サンプル DAG には、コンソールで echo コマンドを実行する 1 つのタスク print_dag_run_conf が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、composer_quickstart DAG の DAG 実行を一覧表示します。

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

出力例:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI には、タスクログを表示するコマンドはありません。Airflow タスクログを表示するには、Cloud Composer DAG UI、Airflow UI、Cloud Logging などの他の方法を使用できます。このガイドでは、特定の DAG 実行のログを Cloud Logging でクエリする方法について説明します。

Google Cloud CLI で次のコマンドを実行します。このコマンドは、Cloud Logging から composer_quickstart DAG の特定の DAG 実行に関するログを読み取ります。--format 引数は、ログメッセージのテキストのみが表示されるように出力を整えます。

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

以下のように置き換えます。

  • RUN_ID は、前に実行した tasks states-for-dag-run コマンドの出力の run_id 値に置き換えます。例: 2024-02-17T15:38:38.969307+00:00

出力例:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

クリーンアップ

このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。

このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

    1. Google Cloud Console で [環境] ページに移動します。

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。

    1. Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。

      [ストレージ] > [ブラウザ] に移動します。

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

次のステップ