Writebacks do BigQuery usando ações do Looker em funções do Cloud Run

Muitos clientes do Looker querem capacitar os usuários a ir além da geração de relatórios sobre os dados no data warehouse e gravar e atualizar esse data warehouse.

Pela API Action, o Looker oferece suporte a esse caso de uso para qualquer data warehouse ou destino. Esta página de documentação orienta os clientes que usam a infraestrutura do Google Cloud na implantação de uma solução nas funções do Cloud Run para gravar de volta no BigQuery. Nesta página, abordamos os seguintes tópicos:

Considerações sobre a solução

Use esta lista de considerações para validar se essa solução atende às suas necessidades.

  • Funções do Cloud Run
    • Por que usar as funções do Cloud Run? Como a oferta "sem servidor" do Google, o Cloud Run functions é uma ótima opção para facilitar as operações e a manutenção. Uma consideração importante é que a latência, principalmente para invocações frias, pode ser maior do que com uma solução que depende de um servidor dedicado.
    • Linguagem e ambiente de execução: as funções do Cloud Run são compatíveis com várias linguagens e ambientes de execução. Esta página de documentação vai se concentrar em um exemplo em JavaScript e Node.js. No entanto, os conceitos podem ser traduzidos diretamente para os outros idiomas e tempos de execução compatíveis.
  • BigQuery
    • Por que o BigQuery? Embora esta página de documentação pressuponha que você já esteja usando o BigQuery, ele é uma ótima opção para um data warehouse em geral. Lembre-se das seguintes considerações:
      • API BigQuery Storage Write:o BigQuery oferece várias interfaces para atualizar dados no seu data warehouse, incluindo, por exemplo, instruções da linguagem de manipulação de dados (DML) em jobs baseados em SQL. No entanto, a melhor opção para gravações de alto volume é a API BigQuery Storage Write.
      • Adicionar em vez de atualizar:mesmo que essa solução apenas adicione linhas, não as atualize, é sempre possível derivar tabelas de "estado atual" no momento da consulta de um registro somente de adição, simulando atualizações.
  • Serviços de suporte
    • Secret Manager:o Secret Manager armazena valores de chaves secretas para garantir que eles não sejam armazenados em lugares muito acessíveis, como diretamente na configuração da função.
    • Identity and Access Management (IAM): o IAM autoriza a função a acessar o segredo necessário durante a execução e gravar na tabela do BigQuery pretendida.
    • Cloud Build:embora não seja abordado em detalhes nesta página, o Cloud Run functions o usa em segundo plano. Você pode usar o Cloud Build para automatizar atualizações implantadas continuamente nas suas funções com base em mudanças no código-fonte em um repositório Git.
  • Autenticação de ação e usuário
    • Conta de serviço do Cloud Run: a maneira principal e mais fácil de usar as ações do Looker para integração com os recursos e ativos próprios da sua organização é autenticar as solicitações como provenientes da sua instância do Looker usando o mecanismo de autenticação baseada em token da API Looker Action e, em seguida, autorizar a função a atualizar os dados no BigQuery usando uma conta de serviço.
    • OAuth:outra opção, não abordada nesta página, seria usar o recurso OAuth da API Looker Action. Essa abordagem é mais complexa e geralmente não é necessária, mas pode ser usada se você precisar definir o acesso dos usuários finais para gravar na tabela usando o IAM, em vez de usar o acesso deles no Looker ou uma lógica ad hoc no código da função.

Tutorial do código de demonstração

Temos um único arquivo com toda a lógica da ação de demonstração disponível no GitHub. Nesta seção, vamos explicar os principais elementos do código.

Código de configuração

A primeira seção tem algumas constantes de demonstração que identificam a tabela em que a ação vai gravar. Na seção Guia de implantação, mais adiante nesta página, você vai substituir o ID do projeto pelo seu, que será a única modificação necessária no código.

/*** Demo constants */
const projectId = "your-project-id"
const datasetId = "demo_dataset"
const tableId = "demo_table"

A próxima seção declara e inicializa algumas dependências de código que sua ação vai usar. Fornecemos um exemplo que acessa o Secret Manager "no código" usando o módulo Node.js do Secret Manager. No entanto, também é possível eliminar essa dependência de código usando o recurso integrado das funções do Cloud Run para recuperar um secret durante a inicialização.

/*** Code Dependencies ***/
const crypto = require("crypto")
const {SecretManagerServiceClient} = require('@google-cloud/secret-manager')
const secrets = new SecretManagerServiceClient()
const BigqueryStorage = require('@google-cloud/bigquery-storage')
const BQSManagedWriter = BigqueryStorage.managedwriter

As dependências @google-cloud referenciadas também são declaradas no arquivo package.json para permitir que sejam pré-carregadas e fiquem disponíveis para o ambiente de execução do Node.js. crypto é um módulo integrado do Node.js e não é declarado em package.json.

Processamento e roteamento de solicitações HTTP

A principal interface que seu código expõe ao ambiente de execução de funções do Cloud Run é uma função JavaScript exportada que segue as convenções do servidor da Web Node.js Express. Em particular, sua função recebe dois argumentos: o primeiro representa a solicitação HTTP, de que você pode ler vários parâmetros e valores de solicitação; e o segundo representa um objeto de resposta, a que você emite seus dados de resposta. Embora o nome da função possa ser qualquer coisa, você precisará informá-lo às funções do Cloud Run mais tarde, conforme detalhado na seção Guia de implantação.

/*** Entry-point for requests ***/
exports.httpHandler = async function httpHandler(req,res) {

A primeira seção da função httpHandler declara as várias rotas que nossa ação vai reconhecer, espelhando de perto os endpoints obrigatórios da API Action para uma única ação e as funções que vão processar cada rota, definidas mais adiante no arquivo.

Embora alguns exemplos de ações + funções do Cloud Run implantem uma função separada para cada rota desse tipo, alinhando-se individualmente com o roteamento padrão das funções do Cloud Run, as funções podem aplicar "sub-roteamento" adicional no código, conforme demonstrado aqui. Isso é uma questão de preferência, mas fazer esse roteamento adicional no código minimiza o número de funções que precisamos implantar e ajuda a manter um único estado de código coerente em todos os endpoints das ações.

    const routes = {
        "/": [hubListing],
        "/status": [hubStatus], // Debugging endpoint. Not required.
        "/action-0/form": [
            requireInstanceAuth,
            action0Form
            ], 
        "/action-0/execute": [
            requireInstanceAuth,
            processRequestBody,
            action0Execute
            ]
        }

O restante da função de gerenciador HTTP implementa o processamento da solicitação HTTP em relação às declarações de rota anteriores e conecta os valores de retorno desses gerenciadores ao objeto de resposta.

    try {
        const routeHandlerSequence = routes[req.path] || [routeNotFound]
        for(let handler of routeHandlerSequence) {
            let handlerResponse = await handler(req)
            if (!handlerResponse) continue 
            return res
                .status(handlerResponse.status || 200)
                .json(handlerResponse.body || handlerResponse)
            }
        }
    catch(err) {
        console.error(err)
        res.status(500).json("Unhandled error. See logs for details.")
        }
    }

Com o manipulador HTTP e as declarações de rota concluídos, vamos analisar os três principais endpoints de ação que precisamos implementar:

Endpoint de lista de ações

Quando um administrador do Looker conecta uma instância do Looker a um servidor de ações pela primeira vez, o Looker chama o URL fornecido, chamado de endpoint da lista de ações, para receber informações sobre as ações disponíveis no servidor.

Nas declarações de rota mostradas anteriormente, disponibilizamos esse endpoint no caminho raiz (/) no URL da nossa função e indicamos que ele seria processado pela função hubListing.

Como você pode ver na definição de função a seguir, não há muito "código" nela. Ela apenas retorna os mesmos dados JSON todas as vezes. Uma coisa importante a observar é que ele inclui dinamicamente o "próprio" URL em alguns dos campos, permitindo que a instância do Looker envie solicitações posteriores de volta para a mesma função.

async function hubListing(req){
    return {
        integrations: [
            {
                name: "demo-bq-insert",
                label: "Demo BigQuery Insert",
                supported_action_types: ["cell", "query", "dashboard"],
                form_url:`${process.env.CALLBACK_URL_PREFIX}/action-0/form`,
                url: `${process.env.CALLBACK_URL_PREFIX}/action-0/execute`,
                icon_data_uri: "data:image/png;base64,...",
                supported_formats:["inline_json"],
                supported_formattings:["unformatted"],
                required_fields:[
                    // You can use this to make your action available
                    // for specific queries/fields
                    // {tag:"user_id"}
                    ],
                params: [
                    // You can use this to require parameters, either
                    // from the Action's administrative configuration,
                    // or from the invoking user's user attributes. 
                    // A common use case might be to have the Looker
                    // instance pass along the user's identification to
                    // allow you to conditionally authorize the action:
                    {name: "email", label: "Email", user_attribute_name: "email", required: true}
                    ]
                }
            ]
        }
    }

Para fins de demonstração, nosso código não exigiu autenticação para recuperar essa listagem. No entanto, se você considerar que os metadados da ação são sensíveis, também poderá exigir autenticação para essa rota, conforme mostrado na próxima seção.

Além disso, nossa função do Cloud Run pode expor e processar várias ações, o que explica nossa convenção de rota /action-X/.... No entanto, a função de demonstração do Cloud Run vai implementar apenas uma ação.

Endpoint do formulário de ação

Embora nem todos os casos de uso exijam um formulário, ter um se encaixa bem no caso de uso de writebacks de banco de dados, já que os usuários podem inspecionar os dados no Looker e fornecer valores para serem inseridos no banco de dados. Como nossa lista de ações forneceu um parâmetro form_url, o Looker vai invocar esse endpoint do formulário de ação (link em inglês) quando um usuário começar a interagir com sua ação para determinar quais dados adicionais capturar dele.

Nas declarações de rota, disponibilizamos esse endpoint no caminho /action-0/form e associamos dois manipuladores a ele: requireInstanceAuth e action0Form.

Configuramos nossas declarações de rota para permitir vários manipuladores como este porque algumas lógicas podem ser reutilizadas para vários endpoints.

Por exemplo, podemos ver que requireInstanceAuth é usado para várias rotas. Usamos esse manipulador sempre que queremos exigir que uma solicitação tenha vindo da nossa instância do Looker. O manipulador recupera o valor esperado do token secreto do Secret Manager e rejeita todas as solicitações que não têm esse valor.

async function requireInstanceAuth(req) {
    const lookerSecret = await getLookerSecret()
    if(!lookerSecret){return}
    const expectedAuthHeader = `Token token="${lookerSecret}"`
    if(!timingSafeEqual(req.headers.authorization,expectedAuthHeader)){
        return {
            status:401,
            body: {error: "Looker instance authentication is required"}
            }
        }
    return

    function timingSafeEqual(a, b) {
        if(typeof a !== "string"){return}
        if(typeof b !== "string"){return}
        var aLen = Buffer.byteLength(a)
        var bLen = Buffer.byteLength(b)
        const bufA = Buffer.allocUnsafe(aLen)
        bufA.write(a)
        const bufB = Buffer.allocUnsafe(aLen) //Yes, aLen
        bufB.write(b)

        return crypto.timingSafeEqual(bufA, bufB) && aLen === bLen;
        }
    }

Usamos uma implementação de timingSafeEqual, em vez da verificação de igualdade padrão (==), para evitar o vazamento de informações de tempo do canal lateral que permitiriam a um invasor descobrir rapidamente o valor do nosso segredo.

Supondo que uma solicitação passe na verificação de autenticação da instância, ela será processada pelo gerenciador action0Form.

async function action0Form(req){
    return [
        {name: "choice",  label: "Choose", type:"select", options:[
            {name:"Yes", label:"Yes"},
            {name:"No", label:"No"},
            {name:"Maybe", label:"Maybe"}
            ]},
        {name: "note", label: "Note", type: "textarea"}
        ]
    }

Embora nosso exemplo de demonstração seja muito estático, o código do formulário pode ser mais interativo para determinados casos de uso. Por exemplo, dependendo da seleção de um usuário em um menu suspenso inicial, diferentes campos podem ser mostrados.

Endpoint de execução de ação

O endpoint Action Execute é onde reside a maior parte da lógica de qualquer ação e onde vamos abordar a lógica específica do caso de uso de inserção do BigQuery.

Nas declarações de rota, disponibilizamos esse endpoint no caminho /action-0/execute e associamos três manipuladores a ele: requireInstanceAuth, processRequestBody e action0Execute.

Já abordamos requireInstanceAuth, e o manipulador processRequestBody fornece um pré-processamento principalmente sem interesse para transformar determinados campos inconvenientes no corpo da solicitação do Looker em um formato mais conveniente. No entanto, você pode consultar o arquivo de código completo.

A função action0Execute começa mostrando exemplos de extração de informações de várias partes da solicitação de ação que podem ser úteis. Na prática, os elementos de solicitação que nosso código se refere como formParams e actionParams podem conter campos diferentes, dependendo do que você declara nos endpoints de ficha e formulário.

async function action0Execute (req){
    try{
        // Prepare some data that we will insert
        const scheduledPlanId = req.body.scheduled_plan && req.body.scheduled_plan.scheduled_plan_id
        const formParams = req.body.form_params || {}
        const actionParams = req.body.data || {}
        const queryData = req.body.attachment.data //If using a standard "push" action

        /*In case any fields require datatype-specific preparation, check this example:
        https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/append_rows_proto2.js
        */

        const newRow = {
            invoked_at: new Date(),
            invoked_by: actionParams.email,
            scheduled_plan_id: scheduledPlanId || null,
            query_result_size: queryData.length,
            choice: formParams.choice,
            note: formParams.note,
            }

Em seguida, o código faz a transição para um código padrão do BigQuery para inserir os dados. As APIs BigQuery Storage Write oferecem outras variações mais complexas que são mais adequadas para uma conexão de streaming persistente ou inserções em massa de muitos registros. No entanto, para responder a interações individuais do usuário no contexto de uma função do Cloud Run, essa é a variação mais direta.

await bigqueryConnectAndAppend(newRow)

...

async function bigqueryConnectAndAppend(row){   
    let writerClient
    try{
        const destinationTablePath = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`
        const streamId = `${destinationTablePath}/streams/_default`
        writerClient = new BQSManagedWriter.WriterClient({projectId})
        const writeMetadata = await writerClient.getWriteStream({
            streamId,
            view: 'FULL',
            })
        const protoDescriptor = BigqueryStorage.adapt.convertStorageSchemaToProto2Descriptor(
            writeMetadata.tableSchema,
            'root'
            )
        const connection = await writerClient.createStreamConnection({
            streamId,
            destinationTablePath,
            })
        const writer = new BQSManagedWriter.JSONWriter({
            streamId,
            connection,
            protoDescriptor,
            })

        let result
        if(row){
            // The API expects an array of rows, so wrap the single row in an array
            const rowsToAppend = [row]
            result = await writer.appendRows(rowsToAppend).getResult()
            }
        return {
            streamId: connection.getStreamId(),
            protoDescriptor,
            result
            }
        }
    catch (e) {throw e}
    finally{
        if(writerClient){writerClient.close()}
        }
    }

O código de demonstração também inclui um endpoint "status" para fins de solução de problemas, mas ele não é necessário para a integração da API Actions.

Guia de implantação

Por fim, vamos fornecer um guia detalhado para implantar a demonstração por conta própria, abordando pré-requisitos, implantação de funções do Cloud Run, configuração do BigQuery e do Looker.

Pré-requisitos de projeto e serviço

Antes de começar a configurar qualquer detalhe, revise esta lista para entender quais serviços e políticas a solução vai precisar:

  1. Um novo projeto:você vai precisar de um novo projeto para hospedar os recursos do nosso exemplo.
  2. Serviços:ao usar as funções do BigQuery e do Cloud Run pela primeira vez na UI do console do Cloud, você vai precisar ativar as APIs necessárias para os serviços, incluindo BigQuery, Artifact Registry, Cloud Build, Cloud Functions, Cloud Logging, Pub/Sub, administrador do Cloud Run e Secret Manager.
  3. Política para invocações não autenticadas:este caso de uso exige que implantemos funções do Cloud Run que permitam invocações não autenticadas, já que vamos processar a autenticação de solicitações recebidas no nosso código de acordo com a API Action, em vez de usar o IAM. Embora isso seja permitido por padrão, a política da organização geralmente restringe esse uso. Especificamente, a política constraints/iam.allowedPolicyMemberDomains restringe quem pode receber permissões do IAM, e talvez seja necessário ajustá-la para permitir o principal allUsers para acesso não autenticado. Consulte este guia, Como criar serviços públicos do Cloud Run quando o compartilhamento restrito de domínio é aplicado, para mais informações se você não conseguir permitir invocações não autenticadas.
  4. Outras políticas:outras Google Cloud restrições da política da organização também podem impedir a implantação de serviços que, de outra forma, seriam permitidos por padrão.

Como implantar a função do Cloud Run

Depois de criar um projeto, siga estas etapas para implantar a função do Cloud Run:

  1. Em Cloud Run functions, clique em Criar função.
  2. Escolha qualquer nome para sua função (por exemplo, "demo-bq-insert-action").
  3. Em Configurações de acionador:
    1. O tipo de acionador já deve ser "HTTPS".
    2. Defina Autenticação como Permitir invocações não autenticadas.
    3. Copie o valor do URL para a área de transferência.
  4. Nas configurações Ambiente de execução > Variáveis de ambiente de execução:
    1. Clique em Adicionar variável.
    2. Defina o nome da variável como CALLBACK_URL_PREFIX.
    3. Cole o URL da etapa anterior como o valor.
  5. Clique em Próxima.
  6. Clique no arquivo package.json e cole o conteúdo.
  7. Clique no arquivo index.js e cole o conteúdo.
  8. Atribua a variável projectId na parte de cima do arquivo ao ID do seu projeto.
  9. Defina o Ponto de entrada como httpHandler.
  10. Clique em Implantar.
  11. Conceda as permissões solicitadas (se houver) à conta de serviço de build.
  12. Aguarde a conclusão da implantação.
  13. Se, em alguma etapa futura, você receber um erro pedindo para revisar os registros Google Cloud , saiba que é possível acessar os registros dessa função na guia Registros desta página.
  14. Antes de sair da página da função do Cloud Run, na guia Detalhes, localize e anote a Conta de serviço da função. Vamos usar isso nas etapas posteriores para garantir que a função tenha as permissões necessárias.
  15. Teste a implantação da função diretamente no navegador acessando o URL. Você vai ver uma resposta JSON contendo a página da sua integração.
  16. Se você receber um erro 403, sua tentativa de definir Permitir invocações não autenticadas pode ter falhado silenciosamente como resultado de uma política da organização. Verifique se a função permite invocações não autenticadas, analise a configuração da política da organização e tente atualizar a configuração.

Acesso à tabela de destino do BigQuery

Na prática, a tabela de destino em que os dados serão inseridos pode estar em um projeto Google Cloud diferente. No entanto, para fins de demonstração, vamos criar uma nova tabela de destino no mesmo projeto. Em qualquer caso, verifique se a conta de serviço da função do Cloud Run tem permissões para gravar na tabela.

  1. Navegue até o console do BigQuery.
  2. Crie a tabela de demonstração:

    1. Na barra do Explorer, use o menu de reticências ao lado do projeto e selecione Criar conjunto de dados.
    2. Dê ao conjunto de dados o ID demo_dataset e clique em Criar conjunto de dados.
    3. Use o menu de reticências no conjunto de dados recém-criado e selecione Criar tabela.
    4. Dê à tabela o nome demo_table.
    5. Em Esquema, selecione Editar como texto, use o esquema a seguir e clique em Criar tabela.

      [
       {"name":"invoked_at","type":"TIMESTAMP"},
       {"name":"invoked_by","type":"STRING"},
       {"name":"scheduled_plan_id","type":"STRING"},
       {"name":"query_result_size","type":"INTEGER"},
       {"name":"choice","type":"STRING"},
       {"name":"note","type":"STRING"}
      ]
      
  3. Atribuir permissões:

    1. Na barra Explorer, clique no conjunto de dados.
    2. Na página conjunto de dados, clique em Compartilhamento > Permissões.
    3. Clique em Adicionar principal.
    4. Defina o Novo principal como a conta de serviço da sua função, observada anteriormente nesta página.
    5. Atribua o papel Editor de dados do BigQuery.
    6. Clique em Salvar.

Conectando ao Looker

Agora que sua função foi implantada, vamos conectar o Looker a ela.

  1. Precisamos de uma senha secreta compartilhada para que sua ação autentique que as solicitações estão vindo da sua instância do Looker. Gere uma string longa e aleatória e mantenha-a em segurança. Vamos usá-lo nas etapas subsequentes como o valor do segredo do Looker.
  2. No console do Cloud, navegue até Secret Manager.
    1. Clique em Criar secret.
    2. Defina o Nome como LOOKER_SECRET. (Isso é codificado no código desta demonstração, mas você pode escolher qualquer nome ao trabalhar com seu próprio código.)
    3. Defina o Valor secreto como o valor secreto que você gerou.
    4. Clique em Criar secret.
    5. Na página Secret, clique na guia Permissões.
    6. Clique em Permitir acesso.
    7. Defina Novos principais como a conta de serviço da sua função, anotada anteriormente.
    8. Atribua o papel Acessador de secrets do Secret Manager.
    9. Clique em Salvar.
    10. Para confirmar se a função está acessando o secret, acesse a rota /status anexada ao URL da função.
  3. Na sua instância do Looker:
    1. Acesse "Administrador" > "Plataforma" > "Ações".
    2. Role até a parte de baixo da página e clique em Adicionar central de ações.
    3. Forneça o URL da sua função (por exemplo, https://your-region-your-project.cloudfunctions.net/demo-bq-insert-action) e confirme clicando em Adicionar central de ações.
    4. Agora você vai ver uma nova entrada na Central de ações com uma ação chamada Demo BigQuery Insert.
    5. Na entrada da Central de ações, clique em Configurar autorização.
    6. Insira o segredo do Looker gerado no campo Token de autorização e clique em Atualizar token.
    7. Na ação Inserção de demonstração do BigQuery, clique em Ativar.
    8. Ative a opção Ativado.
    9. Um teste da ação será executado automaticamente, confirmando que sua função está aceitando a solicitação do Looker e respondendo corretamente ao endpoint do formulário.
    10. Clique em Salvar.

Teste de ponta a ponta

Agora podemos usar nossa nova ação. Essa ação é configurada para funcionar com qualquer consulta. Portanto, escolha uma Análise (por exemplo, uma Análise de atividade do sistema integrada), adicione alguns campos a uma nova consulta, execute-a e escolha Enviar no menu de engrenagem. A ação vai aparecer como um dos destinos disponíveis, e você vai precisar inserir alguns campos:

Captura de tela do modal "Enviar" do Looker com nossa nova ação selecionada

Ao pressionar Enviar, uma nova linha será inserida na sua tabela do BigQuery, e o e-mail da sua conta de usuário do Looker será identificado na coluna invoked_by.