Padrões de tarefas do Java

O aplicativo de amostra de comércio eletrônico apresenta as práticas recomendadas para usar o Dataflow para implementar a análise de dados de streaming e a IA em tempo real. O exemplo contém padrões de tarefas que mostram a melhor maneira de realizar tarefas de programação em Java. Essas tarefas são normalmente necessárias para criar aplicativos de comércio eletrônico.

O app contém os padrões de tarefas do Java a seguir:

Usar esquemas do Apache Beam para trabalhar com dados estruturados

Use esquemas do Apache Beam para facilitar o processamento de dados estruturados.

Converter seus objetos em linhas permite produzir um código em Java muito limpo, o que facilita o exercício de criação de gráficos acíclicos dirigidos (DAG, na sigla em inglês). Também é possível fazer referência às propriedades do objeto como campos nas instruções de análise que você criou, em vez de precisar chamar métodos.

Exemplo

CountViewsPerProduct.java

Usar a JsonToRow para converter dados JSON

O processamento de strings JSON no Dataflow é uma necessidade comum. Por exemplo, as strings JSON são processadas ao transmitir informações de sequência de cliques capturadas de aplicativos da Web. Para processar strings JSON, é preciso convertê-las em linhas ou objetos Java antigos e simples (POJOs, na sigla em inglês) durante o processamento do pipeline.

É possível usar a transformação integrada do Apache Beam JsonToRow para converter strings JSON em linhas. No entanto, se você quiser uma fila para processar mensagens com falhas, será necessário criar essa fila separadamente. Para isso, consulte Como enfileirar para análise extra os dados que não podem ser processados.

Se você precisar converter uma string JSON em um POJO usando AutoValue, registre um esquema para o tipo usando a anotação @DefaultSchema(AutoValueSchema.class). Depois, use a classe de utilitário Convert. O código resultante é semelhante ao seguinte:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Para mais informações, incluindo de quais tipos Java diferentes é possível inferir esquemas, consulte Como criar esquemas (em inglês).

Se a JsonToRow não funcionar com seus dados, o Gson será uma alternativa. A Gson é menos precisa no processamento padrão de dados. Isso pode exigir que você inclua mais validações no processo de conversão de dados.

Exemplos

Usar o gerador de código AutoValue para gerar POJOs

Os esquemas do Apache Beam geralmente são a melhor maneira de representar objetos em um pipeline, devido à maneira como permitem trabalhar com dados estruturados. No entanto, às vezes um objeto Java simples (POJO, na sigla em inglês) é necessário, como ao lidar com objetos de chave-valor ou com o estado do objeto. A criação manual de POJOs exige que você programe modificações para os métodos equals() e hashcode(), o que pode ser demorado e propenso a erros. Modificações incorretas podem resultar em um comportamento inconsistente do aplicativo ou na perda de dados.

Para gerar POJOs, use o builder de classes AutoValue. Essa opção garante que as substituições necessárias sejam usadas e permite evitar possíveis erros. O AutoValue é muito usado na base de código do Apache Beam. Portanto, a familiaridade com esse criador de classes é útil se você quiser desenvolver pipelines do Apache Beam no Dataflow usando Java.

Também é possível AutoValue com esquemas do Apache Beam adicionando uma anotação @DefaultSchema(AutoValueSchema.class). Para mais informações, consulte Como criar esquemas.

Para mais informações sobre AutoValue, consulte Por que AutoValue? e os documentos AutoValue.

Exemplo

Clickstream.java

Enfileirar dados não processáveis para análise mais profunda

Nos sistemas de produção, é importante lidar com dados problemáticos. Se possível, valide e corrija os dados in-stream. Quando não for possível fazer a correção, registre o valor em uma fila de mensagens não processadas, às vezes chamada de fila de mensagens inativas, para análise posterior. Problemas geralmente ocorrem ao converter dados de um formato para outro, por exemplo, ao converter strings JSON em Linhas.

Para resolver esse problema, use uma transformação de várias saídas para transferir os elementos que contêm os dados não processados para outra PCollection e analisar mais a fundo. Esse processamento é uma operação comum que pode ser usada em muitos lugares em um pipeline. Tente tornar a transformação genérica o suficiente para usar em vários lugares. Primeiro, crie um objeto de erro para unir propriedades comuns, incluindo os dados originais. Em seguida, crie uma transformação de coletor que tenha várias opções para o destino.

Exemplos

Aplicar transformações de validação de dados em série

Geralmente, os dados coletados de sistemas externos precisam ser organizados. Estruture o pipeline para que ele possa corrigir dados problemáticos in-stream quando possível. Quando necessário, envie os dados para uma fila de análise.

Como uma única mensagem pode apresentar vários problemas que precisam de correção, planeje o gráfico acíclico dirigido (DAG) necessário. Se um elemento contiver dados com vários erros, será preciso garantir que ele passe por todas as transformações necessárias.

Por exemplo, imagine um elemento com os seguintes valores, e nenhum deles deve ser nulo:

{"itemA": null,"itemB": null}

Verifique se o elemento passa pelas transformações que corrigem os dois problemas em potencial:

badElements.apply(fixItemA).apply(fixItemB)

Seu pipeline pode ter mais etapas seriais, mas a fusão ajuda a minimizar a sobrecarga de processamento introduzida.

Exemplo

ValidateAndCorrectCSEvt.java

Usar DoFn.StartBundle para fazer chamadas de microlote para serviços externos

Talvez seja necessário invocar APIs externas como parte do pipeline. Como um pipeline distribui o trabalho em muitos recursos de computação, fazer uma única chamada para cada elemento que flui pelo sistema pode sobrecarregar um endpoint de serviço externo. Esse problema é especialmente comum quando você não aplicou nenhuma função de redução.

Para evitar esse problema, faça chamadas em lote para sistemas externos.

É possível agrupar chamadas em lote usando uma transformação GroupByKey ou a API Timer do Apache Beam. No entanto, essas abordagens exigem o embaralhamento, que introduz uma sobrecarga de processamento e a necessidade de um número mágico para determinar a chave. espaço.

Em vez disso, use os elementos de ciclo de vida StartBundle e FinishBundle para agrupar seus dados. Com essas opções, o embaralhamento não é necessário.

Uma pequena desvantagem dessa opção é que os tamanhos dos pacotes são determinados dinamicamente pela implementação do executor com base no que está acontecendo no pipeline e nos workers dele. No modo de stream, os pacotes geralmente são pequenos. O agrupamento do Dataflow é influenciado por fatores de back-end, como o uso de fragmentação, a quantidade de dados disponíveis para uma chave específica e a capacidade do pipeline.

Exemplo

EventItemCorrectionService.java

Como usar um padrão de entrada secundária apropriado para enriquecimento de dados

Em aplicativos de análise de streaming, os dados geralmente são enriquecidos com informações adicionais que podem ser úteis para uma análise mais aprofundada. Por exemplo, se você tem o ID da loja para uma transação, pode querer adicionar informações sobre o local da loja. Essas informações adicionais geralmente são adicionadas usando um elemento e trazendo informações de uma tabela de consulta.

Para tabelas desse tipo que estão mudando lentamente e têm um tamanho menor, recomendamos trazer a tabela para o pipeline como uma classe singleton que implementa a interface Map<K,V>. Essa opção permite que você evite que cada elemento faça uma chamada de API para a pesquisa. Depois de adicionar uma cópia de uma tabela no pipeline, você precisará atualizá-la com frequência.

Para lidar com entradas secundárias de atualizações lentas, use os padrões de entrada secundária do Apache Beam.

Armazenamento em cache

As entradas secundárias são carregadas na memória e, portanto, são armazenadas em cache automaticamente.

Defina o tamanho do cache usando a opção --setWorkerCacheMb.

É possível compartilhar o cache entre instâncias DoFn e usar gatilhos externos para atualizá-lo.

Exemplo

SlowMovingStoreLocationDimension.java