Como acionar DAGs (fluxos de trabalho)

Nesta página, você aprende a usar o Cloud Functions para criar acionadores de DAG baseados em eventos.

O Airflow foi projetado para executar DAGs com frequência, mas é possível acioná-los em resposta a eventos, como alterações em um intervalo do Cloud Storage ou mensagens enviadas ao Cloud Pub/Sub. Para isso, você aciona os DAGs do Cloud Composer por meio do Cloud Functions.

No exemplo deste guia, um DAG é executado sempre que ocorre uma mudança em um intervalo do Cloud Storage. Os metadados da mudança no objeto são transmitidos para a configuração do DAG.

Como ativar APIs no projeto

Ativar Google Cloud Functions and Cloud Identity and Google Identity and Access Management (IAM) APIs.

Ativar as APIs

Como conceder permissões de login de blob à conta de serviço do Cloud Functions

Para se autenticar no Cloud IAP, conceda à conta de serviço do Appspot usada pelo Cloud Functions o papel Service Account Token Creator. Para fazer isso, execute o comando a seguir na ferramenta de linha de comando gcloud ou no Cloud Shell:

gcloud iam service-accounts add-iam-policy-binding \
your-project-id@appspot.gserviceaccount.com \
--member=serviceAccount:your-project-id@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Como conseguir o ID do cliente

Para criar um token e se autenticar no Cloud IAP, a função requer o ID do cliente do proxy que protege o servidor da Web do Airflow. A API do Cloud Composer não fornece essas informações diretamente. Na verdade, você precisa fazer uma solicitação não autenticada no servidor da Web do Airflow e coletar o ID do cliente a partir do URL de redirecionamento. Na amostra de código Python a seguir, você vê como conseguir o ID do cliente:

import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()
airflow_uri = environment_data['config']['airflowUri']

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string['client_id'][0])

Como criar a função

Crie uma função com os index.js e package.json abaixo, preenchendo as quatro primeiras constantes. Consulte Criar uma função. Ative a opção Tentar novamente em caso de falha.

Quando você terminar, a função será parecida com esta imagem:

index.js

'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} event The Cloud Functions event.
 * @returns {Promise}
 */
exports.triggerDag = function triggerDag (event) {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {'conf': JSON.stringify(event.data)};

  // Make the request
  return authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT)
    .then(function iapAuthorizationCallback (iap) {
      return makeIapPostRequest(WEBSERVER_URL, BODY, iap.idToken, USER_AGENT, iap.jwt);
    });
};

/**
   * @param {string} clientId The client id associated with the Composer webserver application.
   * @param {string} projectId The id for the project containing the Cloud Function.
   * @param {string} userAgent The user agent string which will be provided with the webserver request.
   */
function authorizeIap (clientId, projectId, userAgent) {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(JSON.stringify({alg: 'RS256', typ: 'JWT'}))
    .toString('base64');

  var jwt = '';
  var jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  return fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'}
    })
    .then(res => res.json())
    .then(function obtainAccessTokenCallback (tokenResponse) {
      if (tokenResponse.error) {
        return Promise.reject(tokenResponse.error);
      }
      var accessToken = tokenResponse.access_token;
      var iat = Math.floor(new Date().getTime() / 1000);
      var claims = {
        iss: SERVICE_ACCOUNT,
        aud: 'https://www.googleapis.com/oauth2/v4/token',
        iat: iat,
        exp: iat + 60,
        target_audience: clientId
      };
      jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
      var toSign = [JWT_HEADER, jwtClaimset].join('.');

      return fetch(
        `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
        {
          method: 'POST',
          body: JSON.stringify({'bytesToSign': Buffer.from(toSign).toString('base64')}),
          headers: {
            'User-Agent': userAgent,
            'Authorization': `Bearer ${accessToken}`
          }
        });
    })
    .then(res => res.json())
    .then(function signJsonClaimCallback (body) {
      if (body.error) {
        return Promise.reject(body.error);
      }
      // Request service account signature on header and claimset
      var jwtSignature = body.signature;
      jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
      var form = new FormData();
      form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
      form.append('assertion', jwt);
      return fetch(
        'https://www.googleapis.com/oauth2/v4/token', {
          method: 'POST',
          body: form
        });
    })
    .then(res => res.json())
    .then(function returnJwt (body) {
      if (body.error) {
        return Promise.reject(body.error);
      }
      return {
        jwt: jwt,
        idToken: body.id_token
      };
    });
}

/**
   * @param {string} url The url that the post request targets.
   * @param {string} body The body of the post request.
   * @param {string} idToken Bearer token used to authorize the iap request.
   * @param {string} userAgent The user agent to identify the requester.
   * @param {string} jwt A Json web token used to authenticate the request.
   */
function makeIapPostRequest (url, body, idToken, userAgent, jwt) {
  return fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      'Authorization': `Bearer ${idToken}`
    },
    body: JSON.stringify(body)
  }).then(function checkIapRequestStatus (res) {
    if (!res.ok) {
      return res.text().then(body => Promise.reject(body));
    }
  });
}

Atualize o package.json com as dependências a seguir.

package.json

{
  "name": "nodejs-docs-samples-functions-composer-storage-trigger",
  "version": "0.0.1",
  "dependencies": {
    "form-data": "^2.3.2",
    "node-fetch": "^2.2.0"
  },
  "engines": {
    "node": ">=4.3.2"
  },
  "private": true,
  "license": "Apache-2.0",
  "author": "Google Inc.",
  "repository": {
    "type": "git",
    "url": "https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git"
  },
  "devDependencies": {
    "@google-cloud/nodejs-repo-tools": "^2.2.5",
    "ava": "0.25.0",
    "proxyquire": "2.0.0",
    "semistandard": "^12.0.1",
    "sinon": "4.4.2"
  },
  "scripts": {
    "lint": "repo-tools lint",
    "test": "ava -T 20s --verbose test/*.test.js"
  }
}

Como configurar o DAG

O DAG a seguir contém um BashOperator que imprime os metadados de mudança no objeto. Para que ele seja executado em resposta aos eventos do Cloud Functions, basta copiá-lo para a pasta "DAGs" do ambiente. Consulte Gerenciar DAGs.

trigger_response_dag.py

import datetime

import airflow
from airflow.operators import bash_operator

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Como testar a função

Faça upload de um arquivo para o intervalo do Cloud Storage. Você verá que o DAG foi acionado e que as informações sobre a mudança do Cloud Storage foram registradas pelo BashOperator.

A seguir

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…