Práticas recomendadas para fluxos de trabalho altamente paralelos

Esta página fornece orientações sobre as práticas recomendadas a serem seguidas ao criar e executar fluxos de trabalho altamente paralelos do HPC do Dataflow, incluindo como usar o código externo em seus pipelines, como executar o pipeline e gerenciar o tratamento de erros do Google Analytics.

Incluir código externo no pipeline

Um diferencial importante dos pipelines altamente paralelos é que eles usam o código C++ no DoFn em vez de uma das linguagens padrão do SDK do Apache Beam. Para pipelines Java, para facilitar o uso de bibliotecas C++ no pipeline, é recomendável usar chamadas de procedimento externo. Nesta seção, descrevemos a abordagem geral usada para executar códigos externos (C++) em pipelines Java.

Uma definição de pipeline do Apache Beam tem vários componentes importantes:

  • PCollections são coleções imutáveis de elementos homogêneos.
  • PTransforms são usados para definir as transformações para um PCollection que gera outro PCollection.
  • O pipeline é a construção que permite, por meio do código, declarar as interações entre PTransforms e PCollections. O pipeline é representado como um gráfico acíclico direcionado (DAG).

Ao usar o código de uma linguagem que não seja uma das linguagens padrão do SDK do Apache Beam, coloque o código no PTransform, que está dentro do DoFn, e use uma das Linguagens do SDK para definir o próprio pipeline. Recomendamos o uso do SDK do Apache Beam para Python para definir o pipeline, porque ele tem uma classe de utilitário que simplifica o uso de outro código. No entanto, é possível usar os outros SDKs do Apache Beam.

É possível usar o código para realizar testes rápidos sem a necessidade de uma compilação completa. Para um sistema de produção, você normalmente cria seus próprios binários, o que lhe dá a liberdade para ajustar o processo às suas necessidades.

O diagrama a seguir ilustra os dois usos de dados de canal:

  • dados usados para direcionar o processo
  • Dados adquiridos durante o processamento e mesclados aos dados do driver

Dois estágios de dados de pipeline

Neste artigo, os dados primários (da fonte) são referidos como dados de direcionamento e os dados secundários (da fase de processamento) são referidos como dados de mesclagem.

Em um caso de uso de serviços financeiros, os dados de direcionamento podem representar algumas centenas de milhares de negociações. Cada negociação precisa ser processada em conjunto com dados de mercado. Nesse caso, os dados de mercado são os dados de junção. Em um caso de uso de mídia, os dados de direcionamento podem ser arquivos de imagens que requerem processamento, mas não precisam de outras fontes de dados e, portanto, não usam dados de junção.

Considerações de tamanho para dados de direcionamento

Se o tamanho do elemento de dados de direcionamento estiver no intervalo de poucos megabytes, trate-o com o paradigma normal Apache do Beam. Basta criar um objeto PCollection a partir da origem e enviá-lo para as transformações do Apache Beam para processamento.

Se o tamanho do elemento de dados de direcionamento estiver no intervalo superior de megabytes ou em gigabytes, como costuma acontecer para cenários de mídia, é possível armazenar os dados no Cloud Storage. Em seguida, no objeto PCollection inicial, faça referência ao URI de armazenamento, e apenas uma referência de URI será utilizada para esses dados.

Considerações de tamanho para dados de junção

Se os dados de junção tiverem algumas centenas de megabytes ou menos, use uma entrada secundária para coletar esses dados para as transformações do Apache Beam. A entrada secundária envia o pacote de dados para todos os workers que precisam dele.

Se os dados de mesclagem estiverem no intervalo de gigabytes ou terabytes, use o Bigtable ou o Cloud Storage para mesclá-los aos dados de direcionamento, dependendo da natureza dos dados. O Bigtable é ideal para cenários financeiros em que os dados de mercado geralmente são acessados como pesquisas de chave-valor do Bigtable. Para mais informações sobre como projetar o esquema do Bigtable, incluindo recomendações para trabalhar com dados de série temporal, consulte a seguinte documentação do Bigtable:

Executar o código externo

Há várias maneiras de executar códigos externos no Apache Beam.

  • Crie um processo chamado a partir de um objeto DoFn dentro de uma transformação do Dataflow.

  • Use o JNI com o SDK para Java.

  • Crie um subprocesso diretamente do objeto DoFn. Embora essa abordagem não seja a mais eficiente, ela é robusta e simples de implementar. Nesta página, você aprenderá a usar uma chamada de subprocesso devido aos possíveis problemas com o JNI.

Ao projetar seu fluxo de trabalho, considere o pipeline completo. Ineficiências na execução do processo são compensadas pelo fato de que a movimentação de dados da fonte ao coletor é realizada em um único canal. Se você comparar essa abordagem com outras, analise os tempos e os custos completos do pipeline.

Extrair os binários para os hosts

Quando você usa uma linguagem nativa do Apache Beam, o SDK do Apache Beam move automaticamente todos os códigos necessários para os workers. Porém, ao fazer uma chamada para códigos externos, será necessário mover o código manualmente.

Arquivos binários armazenados em buckets

Para mover o código, faça o seguinte. O exemplo demonstra as etapas para o SDK do Apache Beam para Java.

  1. Armazene o código externo compilado junto com as informações de versão no Cloud Storage.
  2. No método @Setup, crie um bloco sincronizado para verificar se o arquivo de código está disponível no recurso local. Em vez de implementar uma verificação física, é possível confirmar a disponibilidade usando uma variável estática ao final do primeiro thread.
  3. Se o arquivo não estiver disponível, use a biblioteca de cliente do Cloud Storage para extrair o arquivo do bucket do Cloud Storage para o worker local. Uma abordagem recomendada é usar a classe FileSystems do Apache Beam para esta tarefa.
  4. Depois que o arquivo for movido, confirme se o bit de execução está definido no arquivo de código.
  5. Em um sistema de produção, verifique o hash dos binários para garantir que o arquivo tenha sido copiado corretamente.

Usar a função filesToStage do Apache Beam também é uma opção, mas remove algumas das vantagens da habilidade do executor para empacotar e mover o código Java automaticamente. Além disso, como a chamada para o subprocesso precisa de um local de arquivo absoluto, será necessário usar o código para determinar o caminho da classe e, portanto, o local do arquivo movido pelo filesToStage. Não recomendamos essa abordagem.

Executar os binários externos

Antes de executar códigos externos, é necessário criar um wrapper para eles. Escreva este wrapper na mesma linguagem que o código externo (por exemplo, C++) ou como um script shell. O wrapper permite passar identificadores de arquivo e implementar otimizações, conforme descrito na seção Processamento de design para ciclos pequenos de CPU desta página. O wrapper não precisa ser sofisticado. O snippet a seguir mostra um esboço de um wrapper em C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Este código lê dois parâmetros da lista de argumentos. O primeiro parâmetro é o local do arquivo de retorno para onde os dados são enviados. O segundo parâmetro são os dados ecoados pelo código ao usuário. (Em implementações reais, esse código faz mais do que ecoar uma mensagem de "Hello, world!")

Depois de escrever o código do wrapper, execute o código externo fazendo o seguinte:

  1. Transmita os dados para os binários de códigos externos.
  2. Execute os binários, detecte erros e registre erros e resultados.
  3. Edite as informações de registro.
  4. Colete os dados do processamento concluído.

Transmitir os dados para os binários

Para iniciar o processo de execução da biblioteca, os dados são transmitidos para o código C++. Nesta etapa, é possível aproveitar a integração do Dataflow com outras ferramentas do Google Cloud. Uma ferramenta como o Bigtable pode lidar com conjuntos de dados muito grandes e com acesso de baixa latência e alta simultaneidade, que permite que milhares de núcleos acessem simultaneamente o conjunto de dados. Além disso, no Bigtable os dados são pré-processados, o que torna possível a formatação, o aprimoramento e a filtragem de dados. Todo esse trabalho pode ser feito em transformações do Apache Beam antes da execução do código externo.

Para um sistema de produção, o caminho recomendado é usar um buffer de protocolo para encapsular os dados de entrada. É possível converter os dados de entrada em bytes e codificá-los em base64 antes de transmiti-los à biblioteca externa. As duas maneiras de transmitir esses dados para a biblioteca externa são as seguintes:

  • Dados de entrada pequenos. Para dados pequenos que não excedem o comprimento máximo do sistema para um argumento de comando, transmita o argumento na posição 2 do processo sendo construído com java.lang.ProcessBuilder.
  • Dados de entrada grandes. Para tamanhos de dados maiores, crie um arquivo que o nome inclua um UUID para conter os dados exigidos pelo processo.

Como executar o código C++, capturar erros e gerar registros

Capturar e tratar informações de erros é uma parte essencial do pipeline. Os recursos usados pelo executor do Dataflow são efêmeros e, muitas vezes, é difícil inspecionar os arquivos de registro do worker. Certifique-se de capturar e enviar todas as informações úteis à geração de registros do executor do Dataflow e de armazenar os dados de geração de registro em um ou mais buckets do Cloud Storage.

A abordagem recomendada é redirecionar stdout e stderr para os arquivos, o que permite evitar considerações de falta de memória. Por exemplo, no executor do Dataflow que chama o código C++, é possível incluir linhas como estas:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Processar informações de registro

Muitos casos de uso envolvem o processamento de milhões de elementos. O processamento bem-sucedido gera registros com pouco ou nenhum valor. Portanto, é necessário tomar uma decisão de negócios sobre como reter os dados de registros. Por exemplo, veja estas alternativas para reter todos os dados de registro:

  • Se as informações contidas nos registros de processamento bem-sucedido de elementos não tiverem valor, elas poderão ser descartadas.
  • Crie uma lógica de amostragem para os dados de registro, como amostragem a cada 10.000 entradas de registro. Se o processamento for homogêneo (ou seja, muitas iterações do código geram dados de registro essencialmente idênticos), essa abordagem fornecerá um equilíbrio efetivo entre a retenção de dados de registro e a otimização do processamento.

No caso de condições de falha, a quantidade de dados enviadas para os registros pode ser grande. Uma estratégia eficaz para gerenciar grandes quantidades de dados de registro de erros é ler as primeiras linhas da entrada de registro e enviar apenas essas linhas para o Cloud Logging. É possível carregar o restante do arquivo de registros nos buckets do Cloud Storage. Essa abordagem permite analisar as primeiras linhas dos registros de erros posteriormente e, se necessário, consultar o Cloud Storage para todo o arquivo.

Verificar o tamanho do arquivo de registro também é útil. Se o tamanho do arquivo for zero, você poderá ignorá-lo com segurança ou salvar uma mensagem simples de registro informando que o arquivo não continha dados.

Capturar dados de processamento concluído

Não é recomendado usar stdout para transmitir o resultado do cálculo de volta para a função DoFn. Outros códigos chamados pelo código C++, e até mesmo seu próprio código, também podem enviar mensagens para stdout, poluindo o streaming stdoutput que contém dados de geração de registro. Em vez disso, é melhor alterar o código do wrapper C++ para permitir que ele aceite um parâmetro indicando onde criar um arquivo que armazena o valor. Idealmente, esse arquivo é armazenado de maneira neutra quanto à linguagem, usando buffers de protocolo (em inglês), para que o código C++ mova um objeto de volta para o código Python. O objeto DoFn pode ler o resultado diretamente do arquivo e mover as informações do resultado para a própria chamada output.

É importante executar testes de unidade que gerenciem o processo em si. É importante também implementar um teste de unidade que execute o processo independentemente do pipeline do Dataflow. A depuração da biblioteca pode ser feita de maneira muito mais eficiente se for autônoma e não precisar executar todo o canal.

Processamento de design para ciclos pequenos de CPU

Chamar um subprocesso gera sobrecarga. Dependendo da sua carga de trabalho, podem ser necessárias medidas extras para reduzir a proporção entre o trabalho que está sendo executado e a sobrecarga administrativa de inicialização e encerramento do processo.

No caso de uso de mídia, o tamanho do elemento de dados de direção pode ser nos megabytes altos ou em gigabytes. Como resultado, o processamento de cada elemento de dados pode levar muitos minutos. Nesse caso, o custo de chamar o subprocesso é insignificante comparado ao tempo de processamento geral. A melhor abordagem para essa situação é ter um único elemento iniciando seu próprio processo.

No entanto, em outros casos de uso, como financeiro, o processamento requer unidades de tempo de CPU muito pequenas (dezenas de milissegundos). Nesse caso, a sobrecarga da chamada do subprocesso é desproporcionalmente grande. Uma solução para esse problema é usar a transformação GroupByKey do Apache Beam para criar lotes entre 50 e 100 elementos para alimentação do processo. Por exemplo, siga estas etapas:

  • Em uma função DoFn, crie um par de chave-valor. Se estiver processando transações financeiras, é possível usar o número da transação como chave. Se não houver um número exclusivo para ser usado como chave, gere um checksum a partir dos dados e use uma função de módulo para criar partições de 50 elementos.
  • Envie a chave para uma função GroupByKey.create, que retorna uma coleção KV<key,Iterable<data>> contendo os 50 elementos que podem ser enviados para o processo.

Limitar o paralelismo de workers

Quando você trabalha com uma linguagem que tem suporte nativo no executor do Dataflow, nunca precisa pensar no que está acontecendo com o worker. O Dataflow tem muitos processos que supervisionam o controle de fluxo e as linhas de execução no modo de lote ou de streaming.

No entanto, ao usar uma linguagem externa como C++, saiba que está fazendo algo um pouco fora do comum para a inicialização de subprocessos. No modo de lote, o executor do Dataflow usa uma pequena proporção de linhas de execução em operação para as CPUs, se comparado ao uso do modo de streaming. Especialmente no modo de streaming, recomenda-se a criação de um semáforo em sua classe para controlar mais diretamente o paralelismo de um worker individual.

Por exemplo, com o processamento de mídias, não convém que centenas de elementos de transcodificação sejam processados em paralelo por um único worker. Em casos como esses, é possível criar uma classe de utilitário que forneça permissões para a função DoFn relativas ao trabalho sendo realizado. Com o uso dessa classe, é possível assumir o controle direto das linhas de execução do worker no pipeline.

Usar coletores de dados de alta capacidade no Google Cloud

Após o processamento, os dados são enviados para um coletor de dados. O coletor precisa ser capaz de lidar com o volume dos resultados criados pela solução de processamento de grade.

O diagrama a seguir mostra alguns dos coletores disponíveis no Google Cloud quando o Dataflow está executando uma carga de trabalho de grade.

Coletores disponíveis no Google Cloud

O Bigtable, o BigQuery e o Cloud Pub/Sub conseguem lidar com grandes fluxos de dados. Por exemplo, cada nó do Bigtable pode gerenciar dez mil inserções de até 1 K por segundo, com fácil escalonabilidade horizontal. Como resultado, um cluster do Bigtable de 100 nós pode absorver 1.000.000 de mensagens por segundo geradas pela grade do Dataflow.

Gerenciar falhas de segmentação

Ao usar código C++ em um pipeline, você precisa decidir como gerenciar falhas, porque elas têm ramificações não locais se não forem tratadas corretamente. O executor do Dataflow cria processos conforme necessário em Java, Python ou Go e atribui o trabalho aos processos na forma de pacotes.

Se a chamada para o código C++ for feita com ferramentas fortemente acopladas, como JNI ou Cython, e com as falhas de processo C++, o processo de chamada e a máquina virtual Java (JVM) também falham. Nesse cenário, os pontos de dados inválidos não são capturáveis. Para tornar os pontos de dados inválidos capturáveis, use um acoplamento mais flexível, que ramifica os dados inválidos e permite que o pipeline continue. No entanto, com código C++ maduro que é totalmente testado em todas as variações de dados, é possível usar mecanismos como o Cython.

A seguir