Exatamente uma vez no Dataflow

O Dataflow oferece suporte ao processamento único de registros. Nesta página, explicamos como o Dataflow implementa exatamente uma vez o processamento, garantindo também baixa latência.

Visão geral

Pipelines em lote sempre usam processamento único. Os pipelines de streaming usam o processamento apenas uma vez por padrão, mas também podem usar o processamento pelo menos uma vez.

O processamento único oferece garantias sobre os resultados do processamento dos registros, incluindo os resultados de cada estágio do pipeline. Especificamente, para cada registro que chega ao pipeline a partir de uma origem ou que chega a um estágio após um estágio anterior, o Dataflow garante o seguinte:

  • O registro é processado e não é perdido.
  • Todos os resultados do processamento que permanecem no pipeline são refletidos no máximo uma vez.

Em outras palavras, os registros são processados pelo menos uma vez e os resultados são confirmados exatamente uma vez.

O processamento único garante que os resultados sejam precisos, sem registros duplicados na saída. O Dataflow é otimizado para minimizar a latência e manter a semântica "exatamente uma vez". No entanto, o processamento único ainda gera custos para eliminar a duplicação. Para casos de uso que toleram registros duplicados, geralmente é possível reduzir os custos e melhorar a latência ativando o modo pelo menos uma vez. Para mais informações sobre como escolher entre streaming exatamente uma vez ou pelo menos uma vez, consulte Definir o modo de streaming do pipeline.

Dados atrasados

O processamento único garante a precisão do pipeline: se o pipeline processar um registro, o Dataflow garantirá que o registro seja refletido na saída e que o registro não seja duplicado.

No entanto, em um pipeline de streaming, o processamento único não pode garantir que os resultados sejam concluídos, porque os registros podem chegar atrasados. Por exemplo, suponha que o pipeline execute uma agregação em uma janela de tempo, como Count. Com o processamento único, o resultado é preciso para os registros que chegam na janela em tempo hábil, mas os registros atrasados podem ser descartados.

Geralmente, não há como garantir a integridade em um pipeline de streaming porque, teoricamente, os registros podem chegar arbitrariamente atrasados. No caso de limitação, você precisaria esperar eternamente para produzir um resultado. De maneira mais prática, o Apache Beam permite configurar o limite para eliminar dados atrasados e quando emitir resultados agregados. Para mais informações, consulte Marcas-d'água e dados atrasados na documentação do Apache Beam.

Efeitos colaterais

Não há garantia de que os efeitos colaterais tenham exatamente uma semântica. É importante ressaltar que isso inclui gravar a saída em um armazenamento externo, a menos que o coletor também implemente semântica para exatamente uma vez.

Especificamente, o Dataflow não garante que cada registro passe por cada transformação exatamente uma vez. Devido a novas tentativas ou falhas de worker, o Dataflow pode enviar um registro por meio de uma transformação várias vezes ou até mesmo simultaneamente em vários workers.

Como parte do processamento único, o Dataflow elimina a duplicação das saídas. No entanto, se o código em uma transformação tiver efeitos colaterais, eles poderão ocorrer várias vezes. Por exemplo, se uma transformação fizer uma chamada de serviço remoto, essa chamada poderá ser feita várias vezes para o mesmo registro. Os efeitos colaterais podem até mesmo levar à perda de dados em algumas situações. Por exemplo, suponha que uma transformação leia um arquivo para produzir saída e, em seguida, exclua imediatamente o arquivo sem esperar que a saída seja confirmada. Se ocorrer um erro ao confirmar o resultado, o Dataflow tentará realizar a transformação novamente, mas agora a transformação não poderá ler o arquivo excluído.

Geração de registros

A saída de registro do processamento indica que o processamento ocorreu, mas não indica se os dados foram confirmados. Consequentemente, os arquivos de registro podem indicar que os dados foram processados várias vezes, mesmo que os resultados dos dados processados sejam confirmados no armazenamento permanente. Além disso, os registros nem sempre refletem dados processados e confirmados. Os registros podem ser descartados devido à limitação ou perdidos devido a outros problemas no serviço de geração de registros.

Streaming único

Nesta seção, explicamos como o Dataflow implementa o processamento único para jobs de streaming, incluindo como ele gerencia complexidades como processamento não determinístico, dados atrasados e código personalizado.

Embaralhamento de streaming do Dataflow

Os jobs de streaming do Dataflow são executados em muitos workers diferentes em paralelo, atribuindo intervalos de trabalho a cada worker. Embora as atribuições possam mudar ao longo do tempo em resposta a falhas do worker, escalonamento automático ou outros eventos, após cada transformação GroupByKey, todos os registros com a mesma chave são processados no mesmo worker. A transformação GroupByKey geralmente é usada por transformações compostas, como Count, FileIO e assim por diante. Para garantir que os registros de uma determinada chave acabem no mesmo worker, os workers do Dataflow embaralham dados entre si usando chamadas de procedimento remoto (RPCs).

Para garantir que os registros não sejam perdidos durante a reprodução aleatória, o Dataflow usa o backup upstream. Com o backup upstream, o worker que envia os registros repete as RPCs até receber uma confirmação positiva de que o registro foi recebido. Os efeitos colaterais do processamento do registro estão comprometidos com o armazenamento permanente downstream. Se o worker que envia os registros ficar indisponível, o Dataflow continuará tentando RPCs, o que garante que cada registro seja entregue pelo menos uma vez.

Como essas tentativas podem criar duplicatas, cada mensagem é marcada com um código exclusivo. Cada receptor armazena um catálogo de todos os IDs que já foram vistos e processados. Quando um registro é recebido, o Dataflow procura o código dele no catálogo. Se o ID for encontrado, o registro já foi recebido e confirmado e será descartado como uma cópia. Para garantir que os códigos de registro sejam estáveis, todas as saídas de etapa a etapa são verificadas no ponto de armazenamento. Como resultado, se a mesma mensagem for enviada várias vezes devido a chamadas RPC repetidas, a mensagem será confirmada apenas uma vez no armazenamento.

Como garantir baixa latência

Para que o processamento único seja viável, a E/S precisa ser reduzida, principalmente evitando a E/S em cada registro. Para alcançar esse objetivo, o Dataflow usa filtros de Bloom e coleta de lixo.

Filtros do Bloom

Os filtros do Bloom são estruturas de dados compactas que permitem verificações rápidas de conjunto de associação. No Dataflow, cada worker mantém um filtro Bloom de cada ID encontrado. Quando um novo ID de registro chega, o worker procura o ID no filtro. Se o filtro retornar "false", esse registro não é uma cópia e o worker não procura o ID no armazenamento estável.

O Dataflow mantém um conjunto de filtros Bloom contínuos agrupados por tempo. Quando um registro chega, o Dataflow escolhe o filtro apropriado para verificar com base no carimbo de data/hora do sistema. Essa etapa impede que os filtros do Bloom sejam saturados à medida que os filtros são coletados como lixo e também limita a quantidade de dados que precisam ser verificados na inicialização.

Coleta de lixo

Para evitar o preenchimento de IDs com registros, o Dataflow usa a coleta de lixo para remover registros antigos. O Dataflow usa o carimbo de data/hora do sistema para calcular uma marca-d'água de coleta de lixo.

Essa marca-d'água é baseada no tempo gasto em espera em um determinado estágio. Portanto, ele também fornece informações sobre quais partes do pipeline são lentas. Esses metadados são a base da métrica de atraso do sistema mostrada na interface de monitoramento do Dataflow.

Se um registro chegar com um carimbo de data/hora anterior à marca-d'água e os IDs desse tempo já tiverem sido coletados como lixo, o registro será ignorado. Como a marca d'água baixa que aciona a coleta de lixo não avança até que as entregas de registros sejam confirmadas, esses registros que chegam tardios são duplicados.

Fontes não deterministas

O Dataflow usa o SDK do Apache Beam para ler dados em pipelines. Se o processamento falhar, o Dataflow poderá repetir as leituras de uma origem. Nessa situação, o Dataflow precisa garantir que cada registro exclusivo produzido por uma origem seja gravado exatamente uma vez. Para fontes determinísticas, como Pub/Sub Lite ou Kafka, os registros são lidos com base em um deslocamento registrado, eliminando a necessidade dessa etapa.

Como o Dataflow não atribui códigos de registro automaticamente, as fontes não determinísticas precisam informar a ele quais são os códigos de registro para evitar duplicação. Quando uma origem fornece IDs exclusivos para cada registro, o conector usa uma ordem aleatória no pipeline para remover duplicatas. Os registros com o mesmo código são filtrados. Para um exemplo de como o Dataflow implementa o processamento único ao usar o Pub/Sub como origem, consulte a seção Eliminação de duplicação eficiente na página "Streaming com o Pub/Sub".

Quando você executa DoFns personalizadas como parte do pipeline, o Dataflow não garante que esse código seja executado apenas uma vez por registro. Para garantir pelo menos uma vez o processamento em caso de falhas de worker, o Dataflow pode executar um determinado registro por meio de uma transformação várias vezes ou executar o mesmo registro simultaneamente em vários workers. Se você incluir no código pipeline que realize tarefas como entrar em contato com um serviço externo, as ações poderão ser executadas mais de uma vez em um determinado registro.

Para tornar o processamento não determinístico de maneira eficaz, use o checkpoint. Quando você usa checkpoint, cada saída de uma transformação é transformada em checkpoint em um armazenamento estável com seu ID exclusivo antes de ser entregue ao próximo estágio. Tentativas na entrega aleatória do Dataflow de retransmissão da saída que foi marcada. O código pode ser executado várias vezes, mas o Dataflow garante que a saída de apenas uma dessas execuções seja armazenada. O Dataflow usa um armazenamento consistente que impede a gravação de duplicatas no armazenamento estável.

Exibição de saída exatamente uma vez

O SDK do Apache Beam inclui coletores integrados projetados para garantir que não gerem cópias. Sempre que possível, use um desses coletores integrados.

Se você precisar gravar seu próprio coletor, a melhor abordagem é tornar seu objeto de função idempotente para que possa ser Repita quantas vezes forem necessárias sem causar efeitos indesejados. No entanto, muitas vezes algum componente da transformação que implementa a funcionalidade do coletor não é determinístico e pode mudar se for repetido.

Por exemplo, em uma agregação em janela, o conjunto de registros pode ser não determinístico. Especificamente, a janela pode tentar disparar com o elemento e0, e1, e2. O worker pode falhar antes de confirmar o processamento da janela, mas não antes de enviar esses elementos como efeito colateral. Quando o worker é reiniciado, a janela é acionada novamente e um elemento atrasado e3 chega. Como esse elemento chega antes da confirmação da janela, ele não é contado como dados atrasados, portanto, o DoFn é chamado novamente com os elementos e0, e1, e2 e e3. Esses elementos são enviados à operação de efeito colateral. A idempotência não ajuda nesse cenário porque diferentes conjuntos de registros lógicos são enviados a cada vez.

Para lidar com a não determinação no Dataflow, use a transformação Reshuffle integrada. Quando o Dataflow embaralha dados, ele os grava de maneira durável para que qualquer elemento não determinista gerado fique estável caso novas operações sejam executadas após o embaralhamento. O uso da transformação Reshuffle garante que apenas uma versão da saída de DoFn possa ultrapassar o limite de embaralhamento. O padrão a seguir garante que a operação de efeito colateral sempre receba um registro determinístico de saída:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Para garantir que o executor do Dataflow saiba que os elementos precisam ser estáveis antes de executar um DoFn, adicione a anotação RequiresStableInput a DoFn.

Saiba mais