A aplicação de exemplo de comércio eletrónico demonstra as práticas recomendadas para usar o Dataflow para implementar a análise de dados de streaming e a IA em tempo real. O exemplo contém padrões de tarefas que mostram a melhor forma de realizar tarefas de programação Java. Estas tarefas são normalmente necessárias para criar aplicações de comércio eletrónico.
A aplicação contém os seguintes padrões de tarefas Java:
- Use esquemas do Apache Beam para trabalhar com dados estruturados
- Use JsonToRow para converter dados JSON
- Use o gerador de código
AutoValue
para gerar objetos Java simples (POJOs) - Coloque dados não processáveis em fila para análise posterior
- Aplique transformações de validação de dados em série
- Use
DoFn.StartBundle
para agrupar chamadas em micro lotes para serviços externos - Use um padrão de entrada lateral adequado
Use esquemas do Apache Beam para trabalhar com dados estruturados
Pode usar esquemas do Apache Beam para facilitar o processamento de dados estruturados.
A conversão dos seus objetos em linhas permite-lhe produzir código Java muito limpo, o que facilita o exercício de criação do gráfico acíclico orientado (DAG). Também pode fazer referência às propriedades de objetos como campos nas declarações de estatísticas que cria, em vez de ter de chamar métodos.
Exemplo
Use JsonToRow para converter dados JSON
O tratamento de strings JSON no Dataflow é uma necessidade comum. Por exemplo, as strings JSON são processadas quando são transmitidas informações de fluxo de cliques captadas a partir de aplicações Web. Para processar strings JSON, tem de as converter em linhas ou objetos Java simples (POJOs) durante o processamento da pipeline.
Pode usar a transformação incorporada do Apache Beam JsonToRow para converter strings JSON em linhas. No entanto, se quiser uma fila para processar mensagens sem êxito, tem de a criar separadamente. Consulte o artigo Colocar dados não processáveis em fila para análise adicional.
Se precisar de converter uma string JSON num POJO através do AutoValue,
registe um esquema para o tipo através da anotação @DefaultSchema(AutoValueSchema.class)
e, em seguida, use a classe de utilitários Convert. O código resultante é semelhante ao seguinte:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
Para mais informações, incluindo os diferentes tipos de Java a partir dos quais pode inferir esquemas, consulte o artigo Criar esquemas.
Se JsonToRow não funcionar com os seus dados, Gson é uma alternativa. O Gson é relativamente flexível no processamento predefinido de dados, o que pode exigir que crie mais validação no processo de conversão de dados.
Exemplos
Use o gerador de código AutoValue
para gerar POJOs
Os esquemas do Apache Beam são frequentemente a melhor forma de representar objetos num pipeline, devido à forma como lhe permitem trabalhar com dados estruturados. No entanto, por vezes, é necessário um objeto Java simples (POJO), como quando se lida com objetos de chave-valor ou se processa o estado do objeto.
A criação manual de POJOs requer que codifique substituições para os métodos equals()
e hashcode()
, o que pode demorar muito tempo e estar sujeito a erros. As substituições incorretas podem resultar num comportamento inconsistente da aplicação ou na perda de dados.
Para gerar POJOs, use o criador de classes AutoValue
. Esta opção garante que são usadas as substituições necessárias e
permite evitar potenciais erros.
AutoValue
é muito usado no código base do Apache Beam, pelo que o conhecimento deste criador de classes é útil se quiser desenvolver pipelines do Apache Beam no Dataflow com Java.
Também pode AutoValue
com esquemas do Apache Beam se adicionar uma anotação @DefaultSchema(AutoValueSchema.class)
. Para mais informações, consulte o artigo
Criar esquemas.
Para mais informações sobre a funcionalidade AutoValue
,
consulte Por que motivo AutoValue?
e a documentação do AutoValue
.
Exemplo
Coloque dados não processáveis em fila para análise adicional
Nos sistemas de produção, é importante processar dados problemáticos. Se possível, valide e corrija os dados em fluxo. Quando a correção não é possível, registe o valor numa fila de mensagens não processadas, por vezes denominada fila de mensagens rejeitadas, para análise posterior. Os problemas ocorrem normalmente quando converte dados de um formato para outro, por exemplo, quando converte strings JSON em linhas.
Para resolver este problema, use uma transformação de várias saídas para transferir os elementos que contêm os dados não processados para outra PCollection para análise adicional. Este processamento é uma operação comum que pode querer usar em muitos locais numa pipeline. Tente tornar a transformação genérica o suficiente para ser usada em vários locais. Primeiro, crie um objeto de erro para incluir propriedades comuns, incluindo os dados originais. Em seguida, crie uma transformação de destino que tenha várias opções para o destino.
Exemplos
Aplique transformações de validação de dados em série
Os dados recolhidos de sistemas externos precisam frequentemente de limpeza. Estruture o pipeline de forma a poder corrigir dados problemáticos na stream, sempre que possível. Enviar os dados para uma fila para análise adicional quando necessário.
Uma vez que uma única mensagem pode sofrer de vários problemas que precisam de correção, planeie o gráfico acíclico dirigido (DAG) necessário. Se um elemento contiver dados com vários defeitos, tem de garantir que o elemento passa pelas transformações adequadas.
Por exemplo, imagine um elemento com os seguintes valores, nenhum dos quais deve ser nulo:
{"itemA": null,"itemB": null}
Certifique-se de que o elemento flui através de transformações que corrigem ambos os potenciais problemas:
badElements.apply(fixItemA).apply(fixItemB)
O seu pipeline pode ter mais passos em série, mas a fusão ajuda a minimizar a sobrecarga de processamento introduzida.
Exemplo
Use DoFn.StartBundle
para processar chamadas em micro lotes para serviços externos
Pode ter de invocar APIs externas como parte do seu pipeline. Uma vez que um pipeline distribui o trabalho por muitos recursos de computação, fazer uma única chamada para cada elemento que flui através do sistema pode sobrecarregar um ponto final de serviço externo. Este problema é particularmente comum quando não aplicou funções de redução.
Para evitar este problema, agrupe as chamadas para sistemas externos.
Pode agrupar chamadas usando uma transformação GroupByKey
ou a API Apache Beam Timer. No entanto, ambas as abordagens requerem a mistura, o que introduz alguma sobrecarga de processamento e a necessidade de um número mágico para determinar o espaço de chaves.
Em alternativa, use os elementos do ciclo de vida
StartBundle
e
FinishBundle
para agrupar os seus dados. Com estas opções, não é necessário misturar.
Uma pequena desvantagem desta opção é que os tamanhos dos pacotes são determinados dinamicamente pela implementação do executor com base no que está a acontecer atualmente no pipeline e nos respetivos trabalhadores. No modo in-stream, os pacotes são frequentemente pequenos. O agrupamento do fluxo de dados é influenciado por fatores de back-end, como a utilização da divisão horizontal, a quantidade de dados disponíveis para uma chave específica e o débito do pipeline.
Exemplo
EventItemCorrectionService.java
Use um padrão de entrada lateral adequado para o enriquecimento de dados
Nas aplicações de estatísticas de streaming, os dados são frequentemente enriquecidos com informações adicionais que podem ser úteis para uma análise mais detalhada. Por exemplo, se tiver o ID da loja de uma transação, pode querer adicionar informações sobre a localização da loja. Estas informações adicionais são frequentemente adicionadas através da obtenção de um elemento e da introdução de informações de uma tabela de consulta.
Para tabelas de pesquisa que mudam lentamente e são mais pequenas,
trazer a tabela para o pipeline como uma classe singleton que
implementa a interface Map<K,V>
funciona bem. Esta opção permite-lhe evitar que cada elemento faça uma chamada API para a respetiva procura. Depois de incluir uma cópia de uma tabela no pipeline, tem de a atualizar periodicamente para a manter atualizada.
Para processar entradas laterais de atualização lenta, use os padrões de entrada lateral do Apache Beam.
A colocar em cache
As entradas laterais são carregadas na memória e, por isso, são colocadas automaticamente em cache.
Pode definir o tamanho da cache através da opção --setWorkerCacheMb
.
Pode partilhar a cache em várias instâncias do DoFn
e usar acionadores externos para atualizar a cache.
Exemplo
SlowMovingStoreLocationDimension.java