Criar funções definidas pelo usuário para modelos do Dataflow

Alguns modelos do Dataflow fornecidos pelo Google são compatíveis com funções definidas pelo usuário (UDFs). As UDFs permitem estender a funcionalidade de um modelo sem modificar o código.

Visão geral

Para criar uma UDF, escreva uma função JavaScript ou Python, dependendo do modelo. Você armazena o arquivo de código da UDF no Cloud Storage e especifica o local como um parâmetro do modelo. Para cada elemento de entrada, o modelo chama sua função. A função transforma o elemento ou executa outra lógica personalizada e retorna o resultado ao modelo.

Por exemplo, uma UDF pode ser usada para:

  • Reformatar os dados de entrada para corresponder a um esquema de destino.
  • Editar dados confidenciais
  • Filtre alguns elementos da saída.

A entrada para a função UDF é um único elemento de dados serializado como uma string JSON. A função retorna uma string JSON serializada como saída. O formato dos dados depende do modelo. Por exemplo, no modelo Assinatura do Pub/Sub para BigQuery, a entrada são os dados da mensagem do Pub/Sub serializados como um objeto JSON, e a saída é um objeto JSON serializado que representa uma linha da tabela do BigQuery. Para mais informações, consulte a documentação de cada modelo.

Executar um modelo com uma UDF

Para executar um modelo com uma UDF, especifique o local do Cloud Storage do arquivo JavaScript e o nome da função como parâmetros do modelo.

Com alguns modelos fornecidos pelo Google, é possível criar a UDF diretamente no Console do Google Cloud da seguinte maneira:

  1. Acesse a página do Dataflow no Console do Google Cloud.

    Acessar a página do Dataflow

  2. Clique em Criar job usando um modelo.

  3. Selecione o modelo fornecido pelo Google que você quer executar.

  4. Abra Parâmetros opcionais. Se o modelo for compatível com UDFs, ele terá um parâmetro para o local do Cloud Storage da UDF e outro para o nome da função.

  5. Ao lado do parâmetro do modelo, clique em Criar UDF.

  6. No painel Selecionar ou criar uma função definida pelo usuário (UDF):

    1. Insira um nome para o arquivo. Exemplo: my_udf.js.
    2. Selecione uma pasta do Cloud Storage. Exemplo: gs://your-bucket/your-folder.
    3. Use o editor de código in-line para escrever a função. O editor é pré-preenchido com um código boilerplate que pode ser usado como ponto de partida.
    4. Clique em Criar UDF.

      O console do Google Cloud salva o arquivo UDF e preenche o local do Cloud Storage.

    5. Digite o nome da sua função no campo correspondente.

Criar uma UDF em JavaScript

O código a seguir mostra uma UDF do JavaScript em ambiente autônomo que pode ser iniciada:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

O código JavaScript é executado no mecanismo JavaScript Nashorn. Recomendamos testar a UDF no mecanismo Nashorn antes da implantação. O mecanismo Nashorn não corresponde exatamente à implementação do JavaScript no Node.js. Um problema comum é usar console.log() ou Number.isNaN(), e nenhum deles é definido no mecanismo Nashorn.

É possível testar sua UDF no mecanismo Nashorn usando o Cloud Shell, que tem o JDK 11 pré-instalado. Inicie o Nashorn no modo interativo da seguinte maneira:

jjs --language=es6

No shell interativo da Nashorn, siga estas etapas:

  1. Chame load para carregar o arquivo JavaScript UDF.
  2. Defina um objeto JSON de entrada, dependendo das mensagens esperadas do pipeline.
  3. Use a função JSON.stringify para serializar a entrada para uma string JSON.
  4. Chame a função UDF para processar a string JSON.
  5. Chame JSON.parse para desserializar a saída.
  6. Verificar o resultado.

Exemplo:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Gravar uma UDF em Python

O código a seguir mostra uma UDF do Python em ambiente autônomo que pode ser iniciada:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

As UDFs do Python são compatíveis com pacotes de dependência padrão para o Python e o Apache Beam. Não é possível usar pacotes de terceiros.

Tratamento de erros

Normalmente, quando ocorre um erro durante a execução da UDF, ele é gravado em um local de mensagens inativas. Os detalhes dependem do modelo. Por exemplo, o modelo Assinatura do Pub/Sub para BigQuery cria uma tabela _error_records e grava erros nela. Os erros de UDF do ambiente de execução podem ocorrer devido a erros de sintaxe ou exceções não identificadas. Para verificar se há erros de sintaxe, teste sua UDF localmente.

É possível lançar programaticamente uma exceção para um elemento que não deve ser processado. Nesse caso, o elemento é gravado no local de mensagens inativas, se o modelo for compatível com uma delas. Para ver um exemplo que mostra essa abordagem, consulte Eventos de rota.

Exemplos de casos de uso

Nesta seção, descrevemos alguns padrões comuns para UDFs, com base em casos de uso reais.

Eventos avançados

Use uma UDF para enriquecer eventos com novos campos para mais informações contextuais.

Exemplo:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Eventos de transformação

Use uma UDF para transformar todo o formato do evento, dependendo do que o destino espera.

No exemplo a seguir, uma entrada de registro do Cloud Logging (LogEntry) é revertida para a string de registro original, quando disponível. Dependendo da origem do registro, a string de registro original às vezes é preenchida no campo textPayload. Você pode usar esse padrão para enviar os registros brutos no formato original em vez de enviar todo o LogEntry do Cloud Logging.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Editar ou remover dados de eventos

Use uma UDF para editar ou remover uma parte do evento.

O exemplo a seguir edita o nome do campo sensitiveField substituindo o valor dele e remove completamente o campo chamado redundantField.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Eventos de rota

Use uma UDF para rotear eventos para separar destinos no coletor downstream.

O exemplo a seguir, com base no modelo Pub/Sub para Splunk, encaminha cada evento para o índice correto do Splunk. Ele chama uma função local definida pelo usuário para mapear eventos para índices.

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

O próximo exemplo encaminha eventos não reconhecidos para a fila de mensagens inativas, supondo que o modelo seja compatível com essa fila. Por exemplo, consulte o modelo do Pub/Sub para JDBC. É possível usar esse padrão para filtrar entradas inesperadas antes de gravar no destino.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Filtrar eventos

Use uma UDF para filtrar a saída de eventos indesejados ou não reconhecidos.

O exemplo a seguir descarta eventos em que data.severity é igual a "DEBUG".

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

A seguir