Acione DAGs do Cloud Composer com funções do Cloud Run e a API REST do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página descreve como usar as funções do Cloud Run para acionar DAGs do Cloud Composer em resposta a eventos.

O Apache Airflow foi concebido para executar DAGs de forma regular, mas também pode acionar DAGs em resposta a eventos. Uma forma de o fazer é usar as funções do Cloud Run para acionar DAGs do Cloud Composer quando ocorre um evento especificado.

O exemplo neste guia executa um DAG sempre que ocorre uma alteração num contentor do Cloud Storage. As alterações a qualquer objeto num contentor acionam uma função. Esta função faz um pedido à API REST do Airflow do seu ambiente do Cloud Composer. O Airflow processa este pedido e executa um DAG. O DAG produz informações sobre a alteração.

Antes de começar

Verifique a configuração de rede do seu ambiente

Esta solução não funciona em configurações de IP privado e VPC Service Controls, porque não é possível configurar a conetividade das funções do Cloud Run ao servidor Web do Airflow nestas configurações.

No Cloud Composer 2, pode usar outra abordagem: Acione DAGs com funções do Cloud Run e mensagens do Pub/Sub

Ative APIs para o seu projeto

Consola

Enable the Cloud Composer and Cloud Run functions APIs.

Enable the APIs

gcloud

Enable the Cloud Composer and Cloud Run functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

Ative a API REST do Airflow

Consoante a sua versão do Airflow:

Permita chamadas à API Airflow REST através do controlo de acesso do servidor Web

As funções do Cloud Run podem contactar a API REST Airflow através do endereço IPv4 ou IPv6.

Se não tiver a certeza do que será o intervalo de IPs de chamadas, use uma opção de configuração predefinida no controlo de acesso do servidor Web, que é All IP addresses have access (default)para não bloquear acidentalmente as suas funções do Cloud Run.

Crie um contentor do Cloud Storage

Este exemplo aciona um DAG em resposta a alterações num contentor do Cloud Storage. Crie um novo contentor para usar neste exemplo.

Obtenha o URL do servidor Web do Airflow

Este exemplo faz pedidos da API REST ao ponto final do servidor Web do Airflow. Use a parte do URL da interface Web do Airflow antes de .appspot.com no código da função do Google Cloud.

Consola

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Clique no nome do seu ambiente.

  3. Na página Detalhes do ambiente, aceda ao separador Configuração do ambiente.

  4. O URL do servidor Web do Airflow está listado no item IU Web do Airflow.

gcloud

Execute o seguinte comando:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

Substituir:

  • ENVIRONMENT_NAME com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.

Obtenha o client_id do proxy IAM

Para fazer um pedido ao ponto final da API REST do Airflow, a função requer o ID de cliente do proxy de gestão de identidades e acessos que protege o servidor Web do Airflow.

O Cloud Composer não fornece estas informações diretamente. Em alternativa, faça um pedido não autenticado ao servidor Web do Airflow e capture o ID do cliente a partir do URL de redirecionamento:

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

Substitua AIRFLOW_URL pelo URL da interface Web do Airflow.

Na saída, pesquise a string que se segue a client_id. Por exemplo:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

Guarde o seguinte código num ficheiro denominado get_client_id.py. Preencha os valores de project_id, location e composer_environment e, em seguida, execute o código no Cloud Shell ou no seu ambiente local.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

Carregue um DAG para o seu ambiente

Carregue um DAG para o seu ambiente. O exemplo de DAG seguinte produz a configuração de execução do DAG recebida. Aciona este DAG a partir de uma função, que cria mais tarde neste guia.

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

Implemente uma função do Cloud que acione o DAG

Pode implementar uma função do Cloud com o seu idioma preferido suportado pelas funções do Cloud Run ou pelo Cloud Run. Este tutorial demonstra uma função do Google Cloud implementada em Python e Java.

Especifique parâmetros de configuração da função do Google Cloud

  • Acionador. Para este exemplo, selecione um acionador que funcione quando um novo objeto é criado num contentor ou um objeto existente é substituído.

    • Tipo de acionador. Cloud Storage.

    • Tipo de evento. Finalizar / criar.

    • Segmento. Selecione um contentor que tem de acionar esta função.

    • Tentar novamente em caso de falha. Recomendamos que desative esta opção para os fins deste exemplo. Se usar a sua própria função num ambiente de produção, ative esta opção para resolver erros transitórios.

  • Conta de serviço de tempo de execução na secção Definições de tempo de execução, compilação, ligações e segurança. Use uma das seguintes opções, consoante as suas preferências:

    • Selecione Conta de serviço predefinida do Compute Engine. Com as autorizações do IAM predefinidas, esta conta pode executar funções que acedem a ambientes do Cloud Composer.

    • Crie uma conta de serviço personalizada com a função Utilizador do Composer e especifique-a como uma conta de serviço de tempo de execução para esta função. Esta opção segue o princípio do privilégio mínimo.

  • Tempo de execução e ponto de entrada, no passo Código. Quando adicionar código para este exemplo, selecione o tempo de execução Python 3.7 ou posterior e especifique trigger_dag como ponto de entrada.

Adicione requisitos

Especifique as dependências no ficheiro requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.38.0
google-cloud-pubsub==2.28.0

Coloque o seguinte código no ficheiro main.py e faça as seguintes substituições:

  • Substitua o valor da variável client_id pelo valor client_id que obteve anteriormente.

  • Substitua o valor da variável webserver_id pelo ID do projeto de inquilino, que faz parte do URL da interface Web do Airflow antes de .appspot.com. Obteve o URL da interface Web do Airflow anteriormente.

  • Especifique a versão da API REST Airflow que usa:

    • Se usar a API REST do Airflow estável, defina a variável USE_EXPERIMENTAL_API como False.
    • Se usar a API REST do Airflow experimental, não tem de fazer nenhuma alteração. A variável USE_EXPERIMENTAL_API já está definida como True.


from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

Teste a sua função

Para verificar se a função e o DAG funcionam conforme esperado:

  1. Aguarde até que a função seja implementada.
  2. Carregue um ficheiro para o seu contentor do Cloud Storage. Em alternativa, pode acionar a função manualmente selecionando a ação Testar a função para a mesma na Google Cloud consola.
  3. Verifique a página DAG na interface Web do Airflow. O DAG deve ter uma execução de DAG ativa ou já concluída.
  4. Na IU do Airflow, verifique os registos de tarefas desta execução. Deverá ver que a tarefa print_gcs_info produz os dados recebidos da função nos registos:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

O que se segue?