Writebacks do BigQuery usando ações do Looker em funções do Cloud Run

Muitos clientes da Looker querem capacitar os usuários a ir além dos relatórios sobre os dados no data warehouse para realmente gravar e atualizar esse data warehouse.

Com a API Action, o Looker oferece suporte a esse caso de uso para qualquer data warehouse ou destino. Esta página de documentação orienta os clientes que usam a infraestrutura Google Cloud a implantar uma solução nas funções do Cloud Run para gravar no BigQuery. Nesta página, abordamos os seguintes tópicos:

Considerações sobre a solução

Use esta lista de considerações para validar se a solução está alinhada às suas necessidades.

  • Funções do Cloud Run
    • Por que usar o Cloud Run functions? Como a oferta "sem servidor" do Google, o Cloud Run Functions é uma ótima escolha para facilitar as operações e a manutenção. Uma consideração a ser lembrada é que a latência, principalmente para invocações frias, pode ser maior do que com uma solução que depende de um servidor dedicado.
    • Linguagem e ambiente de execução As funções do Cloud Run são compatíveis com vários idiomas e ambientes de execução. Esta página de documentação vai se concentrar em um exemplo em JavaScript e Node.js. No entanto, os conceitos são diretamente traduzíveis para os outros idiomas e ambientes de execução com suporte.
  • BigQuery
    • Por que o BigQuery? Embora esta página de documentação presuma que você já esteja usando o BigQuery, ele é uma ótima opção para um data warehouse em geral. Lembre-se das seguintes considerações:
      • API BigQuery Storage Write:o BigQuery oferece várias interfaces para atualizar dados no data warehouse, incluindo, por exemplo, instruções de linguagem de manipulação de dados (DML) em trabalhos baseados em SQL. No entanto, a melhor opção para gravações de grande volume é a API BigQuery Storage Write.
      • Adicionar em vez de atualizar:embora essa solução só adicione linhas, e não atualize, é possível derivar tabelas de "estado atual" no momento da consulta de um registro somente de adição, simulando atualizações.
  • Serviços de suporte
    • Secret Manager:o Secret Manager armazena valores secretos para garantir que eles não sejam armazenados em locais muito acessíveis, como diretamente na configuração da função.
    • Gerenciamento de identidade e acesso (IAM): o IAM autoriza a função a acessar o segredo necessário no momento da execução e gravar na tabela do BigQuery.
    • Cloud Build:embora o Cloud Build não seja discutido em detalhes nesta página, o Cloud Run functions o usa em segundo plano. Você pode usar o Cloud Build para automatizar as atualizações implantadas continuamente nas funções com base nas mudanças no código-fonte em um repositório Git.
  • Ação e autenticação do usuário
    • Conta de serviço do Cloud Run: a maneira principal e mais fácil de usar as ações do Looker para integração com os próprios recursos e ativos da sua organização é autenticar as solicitações como provenientes da sua instância do Looker usando o mecanismo de autenticação baseado em token da API Action do Looker e, em seguida, autorizar a função a atualizar dados no BigQuery usando uma conta de serviço.
    • OAuth:outra opção, não abordada nesta página, é usar o recurso OAuth da API Looker Action. Essa abordagem é mais complexa e geralmente não é necessária, mas pode ser usada se você precisar definir o acesso dos usuários finais para gravar na tabela usando o IAM, em vez de usar o acesso deles no Looker ou na lógica ad hoc no código da função.

Tutorial do código de demonstração

Temos um único arquivo com toda a lógica da nossa ação de demonstração disponível no GitHub. Nesta seção, vamos abordar os principais elementos do código.

Código de configuração

A primeira seção tem algumas constantes de demonstração que identificam a tabela em que a ação vai gravar. Na seção Guia de implantação mais adiante nesta página, você vai receber instruções para substituir o ID do projeto pelo seu, que será a única modificação necessária no código.

/*** Demo constants */
const projectId = "your-project-id"
const datasetId = "demo_dataset"
const tableId = "demo_table"

A próxima seção declara e inicializa algumas dependências de código que a ação vai usar. Fornecemos um exemplo que acessa o Secret Manager "no código" usando o módulo Node.js do Secret Manager. No entanto, você também pode eliminar essa dependência de código usando o recurso integrado das funções do Cloud Run para recuperar um segredo durante a inicialização.

/*** Code Dependencies ***/
const crypto = require("crypto")
const {SecretManagerServiceClient} = require('@google-cloud/secret-manager')
const secrets = new SecretManagerServiceClient()
const BigqueryStorage = require('@google-cloud/bigquery-storage')
const BQSManagedWriter = BigqueryStorage.managedwriter

As dependências @google-cloud referenciadas também são declaradas no arquivo package.json para permitir que sejam pré-carregadas e estejam disponíveis para o ambiente de execução do Node.js. crypto é um módulo integrado do Node.js e não é declarado em package.json.

Processamento e roteamento de solicitações HTTP

A interface principal que o código expõe ao ambiente de execução de funções do Cloud Run é uma função JavaScript exportada que segue as convenções do servidor da Web Node.js Express. Especificamente, a função recebe dois argumentos: o primeiro representa a solicitação HTTP, da qual você pode ler vários parâmetros e valores de solicitação, e o segundo representa um objeto de resposta, para o qual você emite os dados de resposta. Embora o nome da função possa ser qualquer coisa, você precisará fornecer o nome às funções do Cloud Run mais tarde, conforme detalhado na seção Guia de implantação.

/*** Entry-point for requests ***/
exports.httpHandler = async function httpHandler(req,res) {

A primeira seção da função httpHandler declara as várias rotas que nossa ação vai reconhecer, refletindo de perto os endpoints necessários da API Action para uma única ação e as funções que vão processar cada rota, definidas mais adiante no arquivo.

Embora alguns exemplos de ações e funções do Cloud Run implantem uma função separada para cada uma dessas rotas para alinhar uma a uma com a rota padrão das funções do Cloud Run, as funções podem aplicar "sub-rotas" adicionais no código, conforme demonstrado aqui. Isso é uma questão de preferência, mas fazer esse roteamento adicional no código minimiza o número de funções que precisamos implantar e ajuda a manter um único estado de código coerente em todos os endpoints das ações.

    const routes = {
        "/": [hubListing],
        "/status": [hubStatus], // Debugging endpoint. Not required.
        "/action-0/form": [
            requireInstanceAuth,
            action0Form
            ], 
        "/action-0/execute": [
            requireInstanceAuth,
            processRequestBody,
            action0Execute
            ]
        }

O restante da função de gerenciador HTTP implementa o processamento da solicitação HTTP em relação às declarações de rota anteriores e conecta os valores de retorno desses gerenciadores ao objeto de resposta.

    try {
        const routeHandlerSequence = routes[req.path] || [routeNotFound]
        for(let handler of routeHandlerSequence) {
            let handlerResponse = await handler(req)
            if (!handlerResponse) continue 
            return res
                .status(handlerResponse.status || 200)
                .json(handlerResponse.body || handlerResponse)
            }
        }
    catch(err) {
        console.error(err)
        res.status(500).json("Unhandled error. See logs for details.")
        }
    }

Com o manipulador HTTP e as declarações de rota fora do caminho, vamos nos aprofundar nos três principais endpoints de ação que precisamos implementar:

Endpoint da lista de ações

Quando um administrador do Looker conecta uma instância do Looker a um servidor de ações pela primeira vez, o Looker chama o URL fornecido, chamado de endpoint da lista de ações, para receber informações sobre as ações disponíveis no servidor.

Nas declarações de rota mostradas anteriormente, disponibilizamos esse endpoint no caminho raiz (/) no URL da função e indicamos que ele seria processado pela função hubListing.

Como você pode ver na definição da função abaixo, não há muito "código", apenas retorna os mesmos dados JSON sempre. Ele inclui dinamicamente o "próprio" URL em alguns campos, permitindo que a instância do Looker envie solicitações posteriores de volta para a mesma função.

async function hubListing(req){
    return {
        integrations: [
            {
                name: "demo-bq-insert",
                label: "Demo BigQuery Insert",
                supported_action_types: ["cell", "query", "dashboard"],
                form_url:`${process.env.CALLBACK_URL_PREFIX}/action-0/form`,
                url: `${process.env.CALLBACK_URL_PREFIX}/action-0/execute`,
                icon_data_uri: "data:image/png;base64,...",
                supported_formats:["inline_json"],
                supported_formattings:["unformatted"],
                required_fields:[
                    // You can use this to make your action available
                    // for specific queries/fields
                    // {tag:"user_id"}
                    ],
                params: [
                    // You can use this to require parameters, either
                    // from the Action's administrative configuration,
                    // or from the invoking user's user attributes. 
                    // A common use case might be to have the Looker
                    // instance pass along the user's identification to
                    // allow you to conditionally authorize the action:
                    {name: "email", label: "Email", user_attribute_name: "email", required: true}
                    ]
                }
            ]
        }
    }

Para fins de demonstração, nosso código não exigiu autenticação para recuperar essa listagem. No entanto, se você considerar os metadados da ação sensíveis, também poderá exigir a autenticação para essa rota, conforme mostrado na próxima seção.

Além disso, nossa função do Cloud Run pode expor e processar várias ações, o que explica nossa convenção de rota de /action-X/.... No entanto, nosso Cloud Run function de demonstração vai implementar apenas uma ação.

Endpoint da ação da ficha

Embora nem todos os casos de uso exijam um formulário, um deles se encaixa bem no caso de recuperação de dados do banco de dados, já que os usuários podem inspecionar dados no Looker e fornecer valores para serem inseridos no banco de dados. Como a lista de ações forneceu um parâmetro form_url, o Looker vai invocar esse endpoint do formulário de ação quando um usuário começar a interagir com sua ação para determinar quais dados adicionais serão capturados dele.

Nas declarações de rota, disponibilizamos esse endpoint no caminho /action-0/form e associamos dois gerenciadores a ele: requireInstanceAuth e action0Form.

Configuramos nossas declarações de rota para permitir vários manipuladores como esse, porque algumas lógicas podem ser reutilizadas para vários endpoints.

Por exemplo, podemos ver que requireInstanceAuth é usado para várias rotas. Usamos esse manipulador sempre que queremos exigir que uma solicitação tenha vindo da nossa instância do Looker. O gerenciador recupera o valor do token esperado do Secret Manager e rejeita todas as solicitações que não têm esse valor.

async function requireInstanceAuth(req) {
    const lookerSecret = await getLookerSecret()
    if(!lookerSecret){return}
    const expectedAuthHeader = `Token token="${lookerSecret}"`
    if(!timingSafeEqual(req.headers.authorization,expectedAuthHeader)){
        return {
            status:401,
            body: {error: "Looker instance authentication is required"}
            }
        }
    return

    function timingSafeEqual(a, b) {
        if(typeof a !== "string"){return}
        if(typeof b !== "string"){return}
        var aLen = Buffer.byteLength(a)
        var bLen = Buffer.byteLength(b)
        const bufA = Buffer.allocUnsafe(aLen)
        bufA.write(a)
        const bufB = Buffer.allocUnsafe(aLen) //Yes, aLen
        bufB.write(b)

        return crypto.timingSafeEqual(bufA, bufB) && aLen === bLen;
        }
    }

Usamos uma implementação timingSafeEqual em vez da verificação de igualdade padrão (==) para evitar o vazamento de informações de temporização de canal lateral que permitiriam que um invasor descobrisse rapidamente o valor do segredo.

Se uma solicitação passar na verificação de autenticação de instância, ela será processada pelo gerenciador action0Form.

async function action0Form(req){
    return [
        {name: "choice",  label: "Choose", type:"select", options:[
            {name:"Yes", label:"Yes"},
            {name:"No", label:"No"},
            {name:"Maybe", label:"Maybe"}
            ]},
        {name: "note", label: "Note", type: "textarea"}
        ]
    }

Embora nosso exemplo de demonstração seja muito estático, o código do formulário pode ser mais interativo para determinados casos de uso. Por exemplo, dependendo da seleção de um usuário em um menu suspenso inicial, campos diferentes podem ser exibidos.

Endpoint de execução de ação

O endpoint de execução de ação é onde a maior parte da lógica de qualquer ação é armazenada e onde vamos abordar a lógica específica do caso de uso de inserção do BigQuery.

Nas declarações de rota, disponibilizamos esse endpoint no caminho /action-0/execute e associamos três gerenciadores a ele: requireInstanceAuth, processRequestBody e action0Execute.

Já abordamos requireInstanceAuth, e o gerenciador processRequestBody fornece um pré-processamento geralmente desinteressante para transformar determinados campos inconvenientes no corpo da solicitação do Looker em um formato mais conveniente, mas você pode consultar o arquivo de código completo.

A função action0Execute começa mostrando exemplos de extração de informações de várias partes da solicitação de ação que podem ser úteis. Na prática, os elementos de solicitação que nosso código se refere como formParams e actionParams podem conter campos diferentes, dependendo do que você declarar nos endpoints de formulário e de ficha.

async function action0Execute (req){
    try{
        // Prepare some data that we will insert
        const scheduledPlanId = req.body.scheduled_plan && req.body.scheduled_plan.scheduled_plan_id
        const formParams = req.body.form_params || {}
        const actionParams = req.body.data || {}
        const queryData = req.body.attachment.data //If using a standard "push" action

        /*In case any fields require datatype-specific preparation, check this example:
        https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/append_rows_proto2.js
        */

        const newRow = {
            invoked_at: new Date(),
            invoked_by: actionParams.email,
            scheduled_plan_id: scheduledPlanId || null,
            query_result_size: queryData.length,
            choice: formParams.choice,
            note: formParams.note,
            }

Em seguida, o código é convertido em um código padrão do BigQuery para inserir os dados. As APIs BigQuery Storage Write oferecem outras variações mais complexas que são mais adequadas para uma conexão de streaming persistente ou inserções em massa de muitos registros. No entanto, para responder a interações individuais do usuário no contexto de uma função do Cloud Run, essa é a variação mais direta.

await bigqueryConnectAndAppend(newRow)

...

async function bigqueryConnectAndAppend(row){   
    let writerClient
    try{
        const destinationTablePath = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`
        const streamId = `${destinationTablePath}/streams/_default`
        writerClient = new BQSManagedWriter.WriterClient({projectId})
        const writeMetadata = await writerClient.getWriteStream({
            streamId,
            view: 'FULL',
            })
        const protoDescriptor = BigqueryStorage.adapt.convertStorageSchemaToProto2Descriptor(
            writeMetadata.tableSchema,
            'root'
            )
        const connection = await writerClient.createStreamConnection({
            streamId,
            destinationTablePath,
            })
        const writer = new BQSManagedWriter.JSONWriter({
            streamId,
            connection,
            protoDescriptor,
            })

        let result
        if(row){
            // The API expects an array of rows, so wrap the single row in an array
            const rowsToAppend = [row]
            result = await writer.appendRows(rowsToAppend).getResult()
            }
        return {
            streamId: connection.getStreamId(),
            protoDescriptor,
            result
            }
        }
    catch (e) {throw e}
    finally{
        if(writerClient){writerClient.close()}
        }
    }

O código de demonstração também inclui um endpoint "status" para fins de solução de problemas, mas ele não é necessário para a integração da API Action.

Guia de implantação

Por fim, vamos fornecer um guia passo a passo para implantar a demonstração, cobrindo pré-requisitos, implantação de funções do Cloud Run, configuração do BigQuery e configuração do Looker.

Pré-requisitos de projeto e serviço

Antes de começar a configurar detalhes, consulte esta lista para entender quais serviços e políticas a solução vai precisar:

  1. Um novo projeto:você vai precisar de um novo projeto para armazenar os recursos do nosso exemplo.
  2. Serviços:ao usar as funções do BigQuery e do Cloud Run pela primeira vez na interface do Cloud Console, você vai receber uma solicitação para ativar as APIs necessárias para os serviços necessários, incluindo BigQuery, Artifact Registry, Cloud Build, Cloud Functions, Cloud Logging, Pub/Sub, Administrador do Cloud Run e Secret Manager.
  3. Política para invocações não autenticadas:neste caso de uso, é necessário implantar funções do Cloud Run que "permitam invocações não autenticadas", já que vamos processar a autenticação das solicitações recebidas no código de acordo com a API Action, em vez de usar o IAM. Embora isso seja permitido por padrão, a política organizacional geralmente restringe esse uso. Especificamente, a política constraints/iam.allowedPolicyMemberDomains restringe quem pode receber permissões do IAM, e talvez seja necessário ajustá-la para permitir que o principal allUsers tenha acesso não autenticado. Consulte este guia, Como criar serviços públicos do Cloud Run quando o compartilhamento restrito de domínio for aplicado, para mais informações se você não conseguir permitir invocações não autenticadas.
  4. Outras políticas:outras Google Cloud restrições da política da organização também podem impedir a implantação de serviços que são permitidos por padrão.

Como implantar a função do Cloud Run

Depois de criar um novo projeto, siga estas etapas para implantar a função do Cloud Run.

  1. Em Funções do Cloud Run, clique em Criar função.
  2. Escolha qualquer nome para a função (por exemplo, "demo-bq-insert-action").
  3. Nas configurações de Acionador:
    1. O tipo de acionador já precisa ser "HTTPS".
    2. Defina Autenticação como Permitir invocações não autenticadas.
    3. Copie o valor do URL para a área de transferência.
  4. Nas configurações Ambiente de execução > Variáveis de ambiente de execução:
    1. Clique em Adicionar variável.
    2. Defina o nome da variável como CALLBACK_URL_PREFIX.
    3. Cole o URL da etapa anterior como valor.
  5. Clique em Próxima.
  6. Clique no arquivo package.json e cole os conteúdos.
  7. Clique no arquivo index.js e cole os conteúdos.
  8. Atribua a variável projectId na parte de cima do arquivo ao seu próprio ID do projeto.
  9. Defina o Ponto de entrada como httpHandler.
  10. Clique em Implantar.
  11. Conceda as permissões solicitadas (se houver) à conta de serviço do build.
  12. Aguarde a conclusão da implantação.
  13. Se, em alguma etapa futura, você receber um erro que direciona a analisar os registros Google Cloud , saiba que é possível acessar os registros dessa função na guia Registros desta página.
  14. Antes de sair da página da função do Cloud Run, na guia Detalhes, localize e anote a conta de serviço que a função tem. Vamos usar isso em etapas posteriores para garantir que a função tenha as permissões necessárias.
  15. Acesse o URL para testar a implantação da função diretamente no navegador. Você vai receber uma resposta JSON com a ficha da integração.
  16. Se você receber um erro 403, sua tentativa de definir Permitir invocações não autenticadas pode ter falhado silenciosamente devido a uma política da organização. Verifique se a função está permitindo invocações não autenticadas, revise a configuração da política da organização e tente atualizar a configuração.

Acesso à tabela de destino do BigQuery

Na prática, a tabela de destino a ser inserida pode estar em um projeto Google Cloud diferente. No entanto, para fins de demonstração, vamos criar uma nova tabela de destino no mesmo projeto. Em ambos os casos, é necessário garantir que a conta de serviço da função do Cloud Run tenha permissões de gravação na tabela.

  1. Acesse o console do BigQuery.
  2. Crie a tabela de demonstração:

    1. Na barra "Explorer", use o menu de reticências ao lado do projeto e selecione Criar conjunto de dados.
    2. Dê ao conjunto de dados o ID demo_dataset e clique em Criar conjunto de dados.
    3. Use o menu de reticências no conjunto de dados recém-criado e selecione Criar tabela.
    4. Dê o nome demo_table à tabela.
    5. Em Esquema, selecione Editar como texto, use o esquema a seguir e clique em Criar tabela.

      [
       {"name":"invoked_at","type":"TIMESTAMP"},
       {"name":"invoked_by","type":"STRING"},
       {"name":"scheduled_plan_id","type":"STRING"},
       {"name":"query_result_size","type":"INTEGER"},
       {"name":"choice","type":"STRING"},
       {"name":"note","type":"STRING"}
      ]
      
  3. Atribuir permissões:

    1. Na barra Explorer, clique no conjunto de dados.
    2. Na página conjunto de dados, clique em Compartilhamento > Permissões.
    3. Clique em Adicionar principal.
    4. Defina o Novo principal como a conta de serviço da sua função, conforme indicado anteriormente nesta página.
    5. Atribua o papel de Editor de dados do BigQuery.
    6. Clique em Salvar.

Como se conectar ao Looker

Agora que sua função foi implantada, vamos conectar o Looker a ela.

  1. Vamos precisar de uma senha secreta compartilhada para que sua ação autentique que as solicitações estão vindo da sua instância do Looker. Gere uma string longa e aleatória e mantenha-a segura. Vamos usá-lo nas etapas seguintes como o valor secreto do Looker.
  2. No Console do Cloud, navegue até Secret Manager.
    1. Clique em Criar secret.
    2. Defina o Nome como LOOKER_SECRET. Isso está codificado no código desta demonstração, mas você pode escolher qualquer nome ao trabalhar com seu próprio código.
    3. Defina o Valor secreto como o valor secreto gerado.
    4. Clique em Criar secret.
    5. Na página Secret, clique na guia Permissões.
    6. Clique em Permitir acesso.
    7. Defina Novos principais como a conta de serviço da sua função, conforme observado anteriormente.
    8. Atribua o papel de Acessador de secrets do Secret Manager.
    9. Clique em Salvar.
    10. Para confirmar que a função está acessando o segredo, acesse a rota /status anexada ao URL da função.
  3. Na sua instância do Looker:
    1. Acesse "Administrador" > "Plataforma" > "Ações".
    2. Na parte de baixo da página, clique em Adicionar hub de ações.
    3. Informe o URL da sua função (por exemplo, https://sua-região-seu-projeto.cloudfunctions.net/demo-bq-insert-action) e confirme clicando em Adicionar Action Hub.
    4. Agora você vai encontrar uma nova entrada do Action Hub com uma ação chamada Demo BigQuery Insert.
    5. Na entrada do Action Hub, clique em Configurar autorização.
    6. Insira o Looker Secret gerado no campo Authorization Token e clique em Update Token.
    7. Na ação Demo BigQuery Insert, clique em Ativar.
    8. Ative a opção Ativado.
    9. Um teste da ação será executado automaticamente, confirmando que sua função está aceitando a solicitação do Looker e respondendo corretamente ao endpoint do formulário.
    10. Clique em Salvar.

Teste completo

Agora, você pode usar a nova ação. Essa ação é configurada para funcionar com qualquer consulta. Portanto, escolha uma Análise (por exemplo, uma Análise de atividade do sistema integrada), adicione alguns campos a uma nova consulta, execute-a e escolha Enviar no menu de engrenagem. A ação vai aparecer como um dos destinos disponíveis, e você vai receber uma solicitação para inserir algumas informações:

Captura de tela do modal "Send" do Looker com a nova ação selecionada

Ao clicar em Enviar, uma nova linha será inserida na tabela do BigQuery, e o e-mail da sua conta de usuário do Looker será identificado na coluna invoked_by.