Como acionar DAGs (fluxos de trabalho)

Nesta página, você aprende a usar o Cloud Functions para criar acionadores de DAG baseados em eventos.

O Airflow foi projetado para executar DAGs com frequência, mas é possível acioná-los em resposta a eventos, como alterações em um intervalo do Cloud Storage ou mensagens enviadas ao Cloud Pub/Sub. Para isso, você aciona os DAGs do Cloud Composer por meio do Cloud Functions.

No exemplo deste guia, um DAG é executado sempre que ocorre uma mudança em um intervalo do Cloud Storage. Os metadados da mudança no objeto são transmitidos para a configuração do DAG.

Como ativar APIs no projeto

{% dynamic if "no_credentials" in setvar.task_params %}{% dynamic setvar credential_type %}NO_AUTH{% dynamic endsetvar %}{% dynamic if not setvar.redirect_url %}{% dynamic setvar redirect_url %}https://console.cloud.google.com{% dynamic endsetvar %}{% dynamic endif %}{% dynamic endif %}{% dynamic if setvar.in_henhouse_no_auth_whitelist %}{% dynamic if not setvar.credential_type %}{% dynamic setvar credential_type %}NO_AUTH{% dynamic endsetvar %}{% dynamic endif %}{% dynamic elif setvar.in_henhouse_service_account_whitelist %}{% dynamic if not setvar.credential_type %}{% dynamic setvar credential_type %}SERVICE_ACCOUNT{% dynamic endsetvar %}{% dynamic endif %}{% dynamic endif %}{% dynamic if not setvar.service_account_roles and setvar.credential_type == "SERVICE_ACCOUNT" %}{% dynamic setvar service_account_roles %}{% dynamic endsetvar %}{% dynamic endif %}{% dynamic setvar console %}{% dynamic if "no_steps" not in setvar.task_params %}
  • {% dynamic endif %}{% dynamic if setvar.api_list %}{% dynamic if setvar.in_henhouse_no_auth_whitelist or setvar.in_henhouse_service_account_whitelist %} Configure um projeto do Console do GCP.

    Configurar um projeto

    Clique para:

    • criar ou selecionar um projeto;
    • ativar {% dynamic if setvar.api_names %}{% dynamic print setvar.api_names %}{% dynamic else %}obrigatória{% dynamic endif %}{% dynamic if "," in setvar.api_list %} APIs{% dynamic elif "API" in setvar.api_names %}{% dynamic else %} API{% dynamic endif %} para o projeto;
    • {% dynamic if setvar.credential_type == 'SERVICE_ACCOUNT' %}
    • criar uma conta de serviço;
    • fazer o download de uma chave privada como JSON.
    • {% dynamic endif %}

    É possível ver e gerenciar esses recursos a qualquer momento no Console do GCP.

    {% dynamic else %}{% dynamic if "no_text" not in setvar.task_params %} Ativar {% dynamic if setvar.api_names %}{% dynamic print setvar.api_names %}{% dynamic else %}obrigatória{% dynamic endif %}{% dynamic if "," in setvar.api_list %} APIs{% dynamic elif "API" in setvar.api_names %}{% dynamic else %} API{% dynamic endif %}. {% dynamic endif %}

    Ativar {% dynamic if "," in setvar.api_list %} as APIs{% dynamic else %} a API{% dynamic endif %}

    {% dynamic endif %}{% dynamic endif %}{% dynamic if "no_steps" not in setvar.task_params %}
  • {% dynamic endif %}{% dynamic endsetvar %}{% dynamic print setvar.console %}

    Como conceder permissões de login de blob à conta de serviço do Cloud Functions

    Para se autenticar no Cloud IAP, conceda à conta de serviço do Appspot usada pelo Cloud Functions o papel Service Account Token Creator. Para fazer isso, execute o comando a seguir na ferramenta de linha de comando gcloud ou no Cloud Shell:

    gcloud iam service-accounts add-iam-policy-binding \
    your-project-id@appspot.gserviceaccount.com \
    --member=serviceAccount:your-project-id@appspot.gserviceaccount.com \
    --role=roles/iam.serviceAccountTokenCreator

    Como conseguir o ID do cliente

    Para criar um token e se autenticar no Cloud IAP, a função requer o ID do cliente do proxy que protege o servidor da Web do Airflow. A API do Cloud Composer não fornece essas informações diretamente. Na verdade, você precisa fazer uma solicitação não autenticada no servidor da Web do Airflow e coletar o ID do cliente a partir do URL de redirecionamento. Na amostra de código Python a seguir, você vê como conseguir o ID do cliente:

    import google.auth
    import google.auth.transport.requests
    import requests
    import six.moves.urllib.parse
    
    # Authenticate with Google Cloud.
    # See: https://cloud.google.com/docs/authentication/getting-started
    credentials, _ = google.auth.default(
        scopes=['https://www.googleapis.com/auth/cloud-platform'])
    authed_session = google.auth.transport.requests.AuthorizedSession(
        credentials)
    
    # project_id = 'YOUR_PROJECT_ID'
    # location = 'us-central1'
    # composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'
    
    environment_url = (
        'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
        '/environments/{}').format(project_id, location, composer_environment)
    composer_response = authed_session.request('GET', environment_url)
    environment_data = composer_response.json()
    airflow_uri = environment_data['config']['airflowUri']
    
    # The Composer environment response does not include the IAP client ID.
    # Make a second, unauthenticated HTTP request to the web server to get the
    # redirect URI.
    redirect_response = requests.get(airflow_uri, allow_redirects=False)
    redirect_location = redirect_response.headers['location']
    
    # Extract the client_id query parameter from the redirect.
    parsed = six.moves.urllib.parse.urlparse(redirect_location)
    query_string = six.moves.urllib.parse.parse_qs(parsed.query)
    print(query_string['client_id'][0])

    Como criar a função

    Crie uma função com os index.js e package.json abaixo, preenchendo as quatro primeiras constantes. Consulte Criar uma função. Ative a opção Tentar novamente em caso de falha.

    Quando você terminar, a função será parecida com esta imagem:

    index.js

    'use strict';
    
    const fetch = require('node-fetch');
    const FormData = require('form-data');
    
    /**
     * Triggered from a message on a Cloud Storage bucket.
     *
     * IAP authorization based on:
     * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
     * and
     * https://cloud.google.com/iap/docs/authentication-howto
     *
     * @param {!Object} event The Cloud Functions event.
     * @returns {Promise}
     */
    exports.triggerDag = function triggerDag (event) {
      // Fill in your Composer environment information here.
    
      // The project that holds your function
      const PROJECT_ID = 'your-project-id';
      // Navigate to your webserver's login page and get this from the URL
      const CLIENT_ID = 'your-iap-client-id';
      // This should be part of your webserver's URL:
      // {tenant-project-id}.appspot.com
      const WEBSERVER_ID = 'your-tenant-project-id';
      // The name of the DAG you wish to trigger
      const DAG_NAME = 'composer_sample_trigger_response_dag';
    
      // Other constants
      const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
      const USER_AGENT = 'gcf-event-trigger';
      const BODY = {'conf': JSON.stringify(event.data)};
    
      // Make the request
      return authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT)
        .then(function iapAuthorizationCallback (iap) {
          return makeIapPostRequest(WEBSERVER_URL, BODY, iap.idToken, USER_AGENT, iap.jwt);
        });
    };
    
    /**
       * @param {string} clientId The client id associated with the Composer webserver application.
       * @param {string} projectId The id for the project containing the Cloud Function.
       * @param {string} userAgent The user agent string which will be provided with the webserver request.
       */
    function authorizeIap (clientId, projectId, userAgent) {
      const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
      const JWT_HEADER = Buffer.from(JSON.stringify({alg: 'RS256', typ: 'JWT'}))
        .toString('base64');
    
      var jwt = '';
      var jwtClaimset = '';
    
      // Obtain an Oauth2 access token for the appspot service account
      return fetch(
        `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
        {
          headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'}
        })
        .then(res => res.json())
        .then(function obtainAccessTokenCallback (tokenResponse) {
          if (tokenResponse.error) {
            return Promise.reject(tokenResponse.error);
          }
          var accessToken = tokenResponse.access_token;
          var iat = Math.floor(new Date().getTime() / 1000);
          var claims = {
            iss: SERVICE_ACCOUNT,
            aud: 'https://www.googleapis.com/oauth2/v4/token',
            iat: iat,
            exp: iat + 60,
            target_audience: clientId
          };
          jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
          var toSign = [JWT_HEADER, jwtClaimset].join('.');
    
          return fetch(
            `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
            {
              method: 'POST',
              body: JSON.stringify({'bytesToSign': Buffer.from(toSign).toString('base64')}),
              headers: {
                'User-Agent': userAgent,
                'Authorization': `Bearer ${accessToken}`
              }
            });
        })
        .then(res => res.json())
        .then(function signJsonClaimCallback (body) {
          if (body.error) {
            return Promise.reject(body.error);
          }
          // Request service account signature on header and claimset
          var jwtSignature = body.signature;
          jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
          var form = new FormData();
          form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
          form.append('assertion', jwt);
          return fetch(
            'https://www.googleapis.com/oauth2/v4/token', {
              method: 'POST',
              body: form
            });
        })
        .then(res => res.json())
        .then(function returnJwt (body) {
          if (body.error) {
            return Promise.reject(body.error);
          }
          return {
            jwt: jwt,
            idToken: body.id_token
          };
        });
    }
    
    /**
       * @param {string} url The url that the post request targets.
       * @param {string} body The body of the post request.
       * @param {string} idToken Bearer token used to authorize the iap request.
       * @param {string} userAgent The user agent to identify the requester.
       * @param {string} jwt A Json web token used to authenticate the request.
       */
    function makeIapPostRequest (url, body, idToken, userAgent, jwt) {
      return fetch(url, {
        method: 'POST',
        headers: {
          'User-Agent': userAgent,
          'Authorization': `Bearer ${idToken}`
        },
        body: JSON.stringify(body)
      }).then(function checkIapRequestStatus (res) {
        if (!res.ok) {
          return res.text().then(body => Promise.reject(body));
        }
      });
    }

    Atualize o package.json com as dependências a seguir.

    package.json

    {
      "name": "nodejs-docs-samples-functions-composer-storage-trigger",
      "version": "0.0.1",
      "dependencies": {
        "form-data": "^2.3.2",
        "node-fetch": "^2.2.0"
      },
      "engines": {
        "node": ">=4.3.2"
      },
      "private": true,
      "license": "Apache-2.0",
      "author": "Google Inc.",
      "repository": {
        "type": "git",
        "url": "https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git"
      },
      "devDependencies": {
        "@google-cloud/nodejs-repo-tools": "^2.2.5",
        "ava": "0.25.0",
        "proxyquire": "2.0.0",
        "semistandard": "^12.0.1",
        "sinon": "4.4.2"
      },
      "scripts": {
        "lint": "repo-tools lint",
        "test": "ava -T 20s --verbose test/*.test.js"
      }
    }
    

    Como configurar o DAG

    O DAG a seguir contém um BashOperator que imprime os metadados de mudança no objeto. Para que ele seja executado em resposta aos eventos do Cloud Functions, basta copiá-lo para a pasta "DAGs" do ambiente. Consulte Gerenciar DAGs.

    trigger_response_dag.py

    import datetime
    
    import airflow
    from airflow.operators import bash_operator
    
    default_args = {
        'owner': 'Composer Example',
        'depends_on_past': False,
        'email': [''],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=5),
        'start_date': datetime.datetime(2017, 1, 1),
    }
    
    with airflow.DAG(
            'composer_sample_trigger_response_dag',
            default_args=default_args,
            # Not scheduled, trigger only
            schedule_interval=None) as dag:
    
        # Print the dag_run's configuration, which includes information about the
        # Cloud Storage object change.
        print_gcs_info = bash_operator.BashOperator(
            task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

    Como testar a função

    Faça upload de um arquivo para o intervalo do Cloud Storage. Você verá que o DAG foi acionado e que as informações sobre a mudança do Cloud Storage foram registradas pelo BashOperator.

    A seguir

    Esta página foi útil? Conte sua opinião sobre:

    Enviar comentários sobre…