触发 DAG(工作流)

本页面介绍如何使用 Cloud Functions 函数实现基于事件触发的 DAG。

虽然 Airflow 设计旨在定期运行 DAG,但您可以通过事件来触发 DAG,例如 Cloud Storage 存储分区的更改或推送到 Cloud Pub/Sub 的消息。为此,您可以使用 Cloud Functions 函数触发 Cloud Composer DAG。

在本指南的示例中,每当 Cloud Storage 存储分区发生更改时,系统都会运行一个 DAG。对象更改元数据会传递到 DAG 的配置中。

为项目启用 API

{% dynamic if "no_credentials" in setvar.task_params %}{% dynamic setvar credential_type %}NO_AUTH{% dynamic endsetvar %}{% dynamic if not setvar.redirect_url %}{% dynamic setvar redirect_url %}https://console.cloud.google.com{% dynamic endsetvar %}{% dynamic endif %}{% dynamic endif %}{% dynamic if setvar.in_henhouse_no_auth_whitelist %}{% dynamic if not setvar.credential_type %}{% dynamic setvar credential_type %}NO_AUTH{% dynamic endsetvar %}{% dynamic endif %}{% dynamic elif setvar.in_henhouse_service_account_whitelist %}{% dynamic if not setvar.credential_type %}{% dynamic setvar credential_type %}SERVICE_ACCOUNT{% dynamic endsetvar %}{% dynamic endif %}{% dynamic endif %}{% dynamic if not setvar.service_account_roles and setvar.credential_type == "SERVICE_ACCOUNT" %}{% dynamic setvar service_account_roles %}{% dynamic endsetvar %}{% dynamic endif %}{% dynamic setvar console %}{% dynamic if "no_steps" not in setvar.task_params %}
  • {% dynamic endif %}{% dynamic if setvar.api_list %}{% dynamic if setvar.in_henhouse_no_auth_whitelist or setvar.in_henhouse_service_account_whitelist %}设置 GCP Console 项目。

    设置项目

    点击即可执行以下操作:

    • 创建或选择项目。
    • 为该项目启用{% dynamic if setvar.api_names %}{% dynamic print setvar.api_names %}{% dynamic else %}所需的{% dynamic endif %}{% dynamic if "," in setvar.api_list %} API{% dynamic elif "API" in setvar.api_names %}{% dynamic else %} API{% dynamic endif %}。
    • {% dynamic if setvar.credential_type == 'SERVICE_ACCOUNT' %}
    • 创建服务帐号。
    • 下载 JSON 格式的私钥。
    • {% dynamic endif %}

    您可以随时在 GCP Console 中查看和管理这些资源。

    {% dynamic else %}{% dynamic if "no_text" not in setvar.task_params %}启用{% dynamic if setvar.api_names %}{% dynamic print setvar.api_names %}{% dynamic else %}所需的{% dynamic endif %}{% dynamic if "," in setvar.api_list %} API{% dynamic elif "API" in setvar.api_names %}{% dynamic else %} API{% dynamic endif %}。 {% dynamic endif %}

    启用{% dynamic if "," in setvar.api_list %} API{% dynamic else %} API{% dynamic endif %}

    {% dynamic endif %}{% dynamic endif %}{% dynamic if "no_steps" not in setvar.task_params %}
  • {% dynamic endif %}{% dynamic endsetvar %}{% dynamic print setvar.console %}

    为 Cloud Functions 服务帐号授予 Blob 签名权限

    要向 Cloud IAP 进行身份验证,请为 Appspot 服务帐号(由 Cloud Functions 函数使用)授予其自身的 Service Account Token Creator 角色。为此,请使用 gcloud 命令行工具或 Cloud Shell 执行以下命令:

    gcloud iam service-accounts add-iam-policy-binding \
    your-project-id@appspot.gserviceaccount.com \
    --member=serviceAccount:your-project-id@appspot.gserviceaccount.com \
    --role=roles/iam.serviceAccountTokenCreator

    获取客户端 ID

    为构建一个用于向 Cloud IAP 进行身份验证的令牌,该函数需要保护 Airflow Web 服务器的代理的客户端 ID。Cloud Composer API 不会直接提供此信息,而是向 Airflow Web 服务器发出未经身份验证的请求,并通过重定向网址来捕获客户端 ID。以下 Python 代码示例演示了如何获取此客户端 ID:

    import google.auth
    import google.auth.transport.requests
    import requests
    import six.moves.urllib.parse
    
    # Authenticate with Google Cloud.
    # See: https://cloud.google.com/docs/authentication/getting-started
    credentials, _ = google.auth.default(
        scopes=['https://www.googleapis.com/auth/cloud-platform'])
    authed_session = google.auth.transport.requests.AuthorizedSession(
        credentials)
    
    # project_id = 'YOUR_PROJECT_ID'
    # location = 'us-central1'
    # composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'
    
    environment_url = (
        'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
        '/environments/{}').format(project_id, location, composer_environment)
    composer_response = authed_session.request('GET', environment_url)
    environment_data = composer_response.json()
    airflow_uri = environment_data['config']['airflowUri']
    
    # The Composer environment response does not include the IAP client ID.
    # Make a second, unauthenticated HTTP request to the web server to get the
    # redirect URI.
    redirect_response = requests.get(airflow_uri, allow_redirects=False)
    redirect_location = redirect_response.headers['location']
    
    # Extract the client_id query parameter from the redirect.
    parsed = six.moves.urllib.parse.urlparse(redirect_location)
    query_string = six.moves.urllib.parse.parse_qs(parsed.query)
    print(query_string['client_id'][0])

    创建函数

    要创建函数,您可以使用以下 index.jspackage.json,注意要填写前四个常量(请参阅创建函数)。请启用失败时重试

    完成后,您应该会得到一个类似下图所示的函数:

    index.js

    'use strict';
    
    const fetch = require('node-fetch');
    const FormData = require('form-data');
    
    /**
     * Triggered from a message on a Cloud Storage bucket.
     *
     * IAP authorization based on:
     * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
     * and
     * https://cloud.google.com/iap/docs/authentication-howto
     *
     * @param {!Object} event The Cloud Functions event.
     * @returns {Promise}
     */
    exports.triggerDag = function triggerDag (event) {
      // Fill in your Composer environment information here.
    
      // The project that holds your function
      const PROJECT_ID = 'your-project-id';
      // Navigate to your webserver's login page and get this from the URL
      const CLIENT_ID = 'your-iap-client-id';
      // This should be part of your webserver's URL:
      // {tenant-project-id}.appspot.com
      const WEBSERVER_ID = 'your-tenant-project-id';
      // The name of the DAG you wish to trigger
      const DAG_NAME = 'composer_sample_trigger_response_dag';
    
      // Other constants
      const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
      const USER_AGENT = 'gcf-event-trigger';
      const BODY = {'conf': JSON.stringify(event.data)};
    
      // Make the request
      return authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT)
        .then(function iapAuthorizationCallback (iap) {
          return makeIapPostRequest(WEBSERVER_URL, BODY, iap.idToken, USER_AGENT, iap.jwt);
        });
    };
    
    /**
       * @param {string} clientId The client id associated with the Composer webserver application.
       * @param {string} projectId The id for the project containing the Cloud Function.
       * @param {string} userAgent The user agent string which will be provided with the webserver request.
       */
    function authorizeIap (clientId, projectId, userAgent) {
      const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
      const JWT_HEADER = Buffer.from(JSON.stringify({alg: 'RS256', typ: 'JWT'}))
        .toString('base64');
    
      var jwt = '';
      var jwtClaimset = '';
    
      // Obtain an Oauth2 access token for the appspot service account
      return fetch(
        `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
        {
          headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'}
        })
        .then(res => res.json())
        .then(function obtainAccessTokenCallback (tokenResponse) {
          if (tokenResponse.error) {
            return Promise.reject(tokenResponse.error);
          }
          var accessToken = tokenResponse.access_token;
          var iat = Math.floor(new Date().getTime() / 1000);
          var claims = {
            iss: SERVICE_ACCOUNT,
            aud: 'https://www.googleapis.com/oauth2/v4/token',
            iat: iat,
            exp: iat + 60,
            target_audience: clientId
          };
          jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
          var toSign = [JWT_HEADER, jwtClaimset].join('.');
    
          return fetch(
            `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
            {
              method: 'POST',
              body: JSON.stringify({'bytesToSign': Buffer.from(toSign).toString('base64')}),
              headers: {
                'User-Agent': userAgent,
                'Authorization': `Bearer ${accessToken}`
              }
            });
        })
        .then(res => res.json())
        .then(function signJsonClaimCallback (body) {
          if (body.error) {
            return Promise.reject(body.error);
          }
          // Request service account signature on header and claimset
          var jwtSignature = body.signature;
          jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
          var form = new FormData();
          form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
          form.append('assertion', jwt);
          return fetch(
            'https://www.googleapis.com/oauth2/v4/token', {
              method: 'POST',
              body: form
            });
        })
        .then(res => res.json())
        .then(function returnJwt (body) {
          if (body.error) {
            return Promise.reject(body.error);
          }
          return {
            jwt: jwt,
            idToken: body.id_token
          };
        });
    }
    
    /**
       * @param {string} url The url that the post request targets.
       * @param {string} body The body of the post request.
       * @param {string} idToken Bearer token used to authorize the iap request.
       * @param {string} userAgent The user agent to identify the requester.
       * @param {string} jwt A Json web token used to authenticate the request.
       */
    function makeIapPostRequest (url, body, idToken, userAgent, jwt) {
      return fetch(url, {
        method: 'POST',
        headers: {
          'User-Agent': userAgent,
          'Authorization': `Bearer ${idToken}`
        },
        body: JSON.stringify(body)
      }).then(function checkIapRequestStatus (res) {
        if (!res.ok) {
          return res.text().then(body => Promise.reject(body));
        }
      });
    }

    使用以下依赖项更新 package.json

    package.json

    {
      "name": "nodejs-docs-samples-functions-composer-storage-trigger",
      "version": "0.0.1",
      "dependencies": {
        "form-data": "^2.3.2",
        "node-fetch": "^2.2.0"
      },
      "engines": {
        "node": ">=4.3.2"
      },
      "private": true,
      "license": "Apache-2.0",
      "author": "Google Inc.",
      "repository": {
        "type": "git",
        "url": "https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git"
      },
      "devDependencies": {
        "@google-cloud/nodejs-repo-tools": "^2.2.5",
        "ava": "0.25.0",
        "proxyquire": "2.0.0",
        "semistandard": "^12.0.1",
        "sinon": "4.4.2"
      },
      "scripts": {
        "lint": "repo-tools lint",
        "test": "ava -T 20s --verbose test/*.test.js"
      }
    }
    

    设置 DAG

    以下 DAG 包含一个 BashOperator,用于输出对象更改元数据。为使该 DAG 能够运行来响应 Cloud Functions 事件,请将其复制到环境的 DAGs 文件夹中(请参阅管理 DAG)。

    trigger_response_dag.py

    import datetime
    
    import airflow
    from airflow.operators import bash_operator
    
    default_args = {
        'owner': 'Composer Example',
        'depends_on_past': False,
        'email': [''],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=5),
        'start_date': datetime.datetime(2017, 1, 1),
    }
    
    with airflow.DAG(
            'composer_sample_trigger_response_dag',
            default_args=default_args,
            # Not scheduled, trigger only
            schedule_interval=None) as dag:
    
        # Print the dag_run's configuration, which includes information about the
        # Cloud Storage object change.
        print_gcs_info = bash_operator.BashOperator(
            task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

    测试函数

    请将文件上传到您的 Cloud Storage 存储分区。您应该会看到,该 DAG 已被触发,并且 BashOperator 已记录 Cloud Storage 更改的相关信息。

    后续步骤

    此页内容是否有用?请给出您的反馈和评价:

    发送以下问题的反馈:

    此网页
    Cloud Composer