DAG(ワークフロー)のトリガー

このページでは、Cloud Functions を使用して、イベントベースの DAG トリガーを実現する方法について説明します。

Airflow では定期的なスケジュールで DAG が実行されるように設計されています。一方、イベント(Cloud Storage バケットでの変更や Cloud Pub/Sub に push されたメッセージなど)に応答して DAG をトリガーすることもできます。これを実現するには、Cloud Functions によって Cloud Composer DAG をトリガーします。

このガイドの例では、Cloud Storage バケットで変更が生じるたびに DAG を実行します。その際、オブジェクト変更のメタデータが DAG の構成に渡されます。

プロジェクトの API を有効にする

Google Cloud Functions、Cloud Identity、Google Identity and Access Management(IAM) API を有効にします。

APIを有効にする

Cloud Functions サービス アカウントへ BLOB 署名権限を付与する

Cloud IAP に対する認証をするには、Appspot サービス アカウント(Cloud Functions で使用される)自体に Service Account Token Creator 役割を付与します。これを行うには、gcloud コマンドライン ツールや Cloud Shell で次のコマンドを実行します。

gcloud iam service-accounts add-iam-policy-binding \
your-project-id@appspot.gserviceaccount.com \
--member=serviceAccount:your-project-id@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

クライアント ID を取得する

Cloud IAP に対する認証のためのトークンを作成するには、Airflow ウェブサーバーを保護するプロキシのクライアント ID が必要です。Cloud Composer API はこの情報を直接提供しません。そのため、Airflow ウェブサーバーに未認証リクエストを送信し、リダイレクト URL からクライアント ID を取得します。次の Python コードサンプルは、クライアント ID の取得方法を示しています。

import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()
airflow_uri = environment_data['config']['airflowUri']

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string['client_id'][0])

関数を作成する

下記の index.jspackage.json で、最初の 4 つの定数を入力して関数を作成します(関数の作成をご覧ください)。[失敗時に再試行する] を有効にします。

作成が終了したら、関数は次の画像のようになります。

index.js

'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} event The Cloud Functions event.
 * @returns {Promise}
 */
exports.triggerDag = function triggerDag (event) {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {'conf': JSON.stringify(event.data)};

  // Make the request
  return authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT)
    .then(function iapAuthorizationCallback (iap) {
      return makeIapPostRequest(WEBSERVER_URL, BODY, iap.idToken, USER_AGENT, iap.jwt);
    });
};

/**
   * @param {string} clientId The client id associated with the Composer webserver application.
   * @param {string} projectId The id for the project containing the Cloud Function.
   * @param {string} userAgent The user agent string which will be provided with the webserver request.
   */
function authorizeIap (clientId, projectId, userAgent) {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(JSON.stringify({alg: 'RS256', typ: 'JWT'}))
    .toString('base64');

  var jwt = '';
  var jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  return fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'}
    })
    .then(res => res.json())
    .then(function obtainAccessTokenCallback (tokenResponse) {
      if (tokenResponse.error) {
        return Promise.reject(tokenResponse.error);
      }
      var accessToken = tokenResponse.access_token;
      var iat = Math.floor(new Date().getTime() / 1000);
      var claims = {
        iss: SERVICE_ACCOUNT,
        aud: 'https://www.googleapis.com/oauth2/v4/token',
        iat: iat,
        exp: iat + 60,
        target_audience: clientId
      };
      jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
      var toSign = [JWT_HEADER, jwtClaimset].join('.');

      return fetch(
        `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
        {
          method: 'POST',
          body: JSON.stringify({'bytesToSign': Buffer.from(toSign).toString('base64')}),
          headers: {
            'User-Agent': userAgent,
            'Authorization': `Bearer ${accessToken}`
          }
        });
    })
    .then(res => res.json())
    .then(function signJsonClaimCallback (body) {
      if (body.error) {
        return Promise.reject(body.error);
      }
      // Request service account signature on header and claimset
      var jwtSignature = body.signature;
      jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
      var form = new FormData();
      form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
      form.append('assertion', jwt);
      return fetch(
        'https://www.googleapis.com/oauth2/v4/token', {
          method: 'POST',
          body: form
        });
    })
    .then(res => res.json())
    .then(function returnJwt (body) {
      if (body.error) {
        return Promise.reject(body.error);
      }
      return {
        jwt: jwt,
        idToken: body.id_token
      };
    });
}

/**
   * @param {string} url The url that the post request targets.
   * @param {string} body The body of the post request.
   * @param {string} idToken Bearer token used to authorize the iap request.
   * @param {string} userAgent The user agent to identify the requester.
   * @param {string} jwt A Json web token used to authenticate the request.
   */
function makeIapPostRequest (url, body, idToken, userAgent, jwt) {
  return fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      'Authorization': `Bearer ${idToken}`
    },
    body: JSON.stringify(body)
  }).then(function checkIapRequestStatus (res) {
    if (!res.ok) {
      return res.text().then(body => Promise.reject(body));
    }
  });
}

次の依存関係を使用して package.json を更新します。

package.json

{
  "name": "nodejs-docs-samples-functions-composer-storage-trigger",
  "version": "0.0.1",
  "dependencies": {
    "form-data": "^2.3.2",
    "node-fetch": "^2.2.0"
  },
  "engines": {
    "node": ">=4.3.2"
  },
  "private": true,
  "license": "Apache-2.0",
  "author": "Google Inc.",
  "repository": {
    "type": "git",
    "url": "https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git"
  },
  "devDependencies": {
    "@google-cloud/nodejs-repo-tools": "^2.2.5",
    "ava": "0.25.0",
    "proxyquire": "2.0.0",
    "semistandard": "^12.0.1",
    "sinon": "4.4.2"
  },
  "scripts": {
    "lint": "repo-tools lint",
    "test": "ava -T 20s --verbose test/*.test.js"
  }
}

DAG を設定する

次の DAG には、オブジェクト変更メタデータを出力する BashOperator が含まれています。Cloud Functions イベントに応答して実行するようにするには、環境の DAG フォルダにコピーします(DAG の管理をご覧ください)。

trigger_response_dag.py

import datetime

import airflow
from airflow.operators import bash_operator

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

関数をテストする

Cloud Storage バケットにファイルをアップロードします。DAG がトリガーされ、Cloud Storage の変更に関する情報が BashOperator によってログに記録されたことが確認できます。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...