Esta página fornece orientações sobre as práticas recomendadas a seguir ao criar e executar fluxos de trabalho altamente paralelos de HPC do Dataflow, incluindo como usar código externo nos seus pipelines, como executar o pipeline e como gerir o processamento de erros.
Inclua código externo no seu pipeline
Um fator de diferenciação fundamental para pipelines altamente paralelos é que usam código C++ no
DoFn
em vez de um dos idiomas padrão do SDK do Apache Beam. Para pipelines Java, para facilitar a utilização de bibliotecas C++ no pipeline, recomenda-se que use chamadas de procedimentos externos. Esta secção descreve a abordagem geral usada para executar código externo (C++) em pipelines Java.
Uma definição de pipeline do Apache Beam tem vários componentes principais:
PCollections
são coleções imutáveis de elementos homogéneos.PTransforms
são usados para definir as transformações a umPCollection
que gera outroPCollection
.- O pipeline é a construção que lhe permite, através de código, declarar as interações entre o
PTransforms
e oPCollections
. O pipeline é representado como um gráfico acíclico orientado (DAG).
Quando usa código de um idioma que não é um dos idiomas padrão do SDK do Apache Beam, coloque o código em PTransform
, que está dentro de DoFn
, e use um dos idiomas padrão do SDK para definir o próprio pipeline.
Recomendamos que use o SDK Python do Apache Beam para definir o pipeline,
porque o SDK Python tem uma classe de utilidade que simplifica a utilização de outro código. No entanto, pode usar os outros SDKs do Apache Beam.
Pode usar o código para realizar experiências rápidas sem precisar de uma compilação completa. Para um sistema de produção, normalmente, cria os seus próprios ficheiros binários, o que lhe dá a liberdade de ajustar o processo às suas necessidades.
O diagrama seguinte ilustra as duas utilizações dos dados do pipeline:
- Os dados são usados para impulsionar o processo.
- Os dados são adquiridos durante o processamento e associados aos dados do condutor.
Nesta página, os dados principais (da origem) são denominados dados de base e os dados secundários (da fase de processamento) são denominados dados de junção.
Num exemplo de utilização de finanças, os dados de condução podem ser de algumas centenas de milhares de transações. Cada negociação tem de ser processada em conjunto com os dados de mercado. Nesse caso, os dados de mercado são os dados de associação. Num exemplo de utilização de multimédia, os dados de base podem ser ficheiros de imagens que requerem processamento, mas não precisam de outras origens de dados e, por isso, não usam dados de junção.
Considerações sobre o tamanho dos dados de condução
Se o tamanho do elemento de dados determinante estiver no intervalo de poucos megabytes, trate-o com o paradigma normal do Apache Beam de criar um objeto PCollection
a partir da origem e enviar o objeto para as transformações do Apache Beam para processamento.
Se o tamanho do elemento de dados determinante estiver na ordem dos megabytes ou gigabytes, como é habitual para multimédia, pode colocar os dados determinantes no Cloud Storage. Em seguida, no objeto PCollection
inicial, referencie o URI de armazenamento e use apenas uma referência de URI a esses dados.
Considerações sobre o tamanho para combinar dados
Se os dados de junção tiverem algumas centenas de megabytes ou menos, use uma entrada lateral para enviar estes dados para as transformações do Apache Beam. A entrada lateral envia o pacote de dados a todos os trabalhadores que precisam dele.
Se os dados de junção estiverem no intervalo de gigabytes ou terabytes, use o Bigtable ou o Cloud Storage para unir os dados de junção aos dados de condução, consoante a natureza dos dados. O Bigtable é ideal para cenários financeiros em que os dados de mercado são frequentemente acedidos como pesquisas de chave-valor do Bigtable. Para mais informações sobre a conceção do esquema do Bigtable, incluindo recomendações para trabalhar com dados de séries cronológicas, consulte a seguinte documentação do Bigtable:
Executar o código externo
Pode executar código externo no Apache Beam de várias formas.
Crie um processo que é chamado a partir de um objeto
DoFn
dentro de uma transformação do Dataflow.Use JNI com o SDK Java.
Crie um subprocesso diretamente a partir do objeto
DoFn
. Embora esta abordagem não seja a mais eficiente, é robusta e simples de implementar. Devido aos potenciais problemas com a utilização da JNI, esta página demonstra a utilização de uma chamada de subprocesso.
Ao criar o fluxo de trabalho, considere o pipeline integral completo. Quaisquer ineficiências na forma como o processo é executado são compensadas pelo facto de o movimento de dados da origem até ao destino ser realizado com um único pipeline. Se comparar esta abordagem com outras, analise os tempos ponto a ponto do pipeline, bem como os custos ponto a ponto.
Transfira os ficheiros binários para os anfitriões
Quando usa uma linguagem nativa do Apache Beam, o SDK do Apache Beam move automaticamente todo o código necessário para os trabalhadores. No entanto, quando faz uma chamada para código externo, tem de mover o código manualmente.
Para mover o código, faça o seguinte. O exemplo demonstra os passos para o SDK Java do Apache Beam.
- Armazene o código externo compilado, juntamente com as informações de controlo de versões, no Cloud Storage.
- No método
@Setup
, crie um bloco sincronizado para verificar se o ficheiro de código está disponível no recurso local. Em vez de implementar uma verificação física, pode confirmar a disponibilidade através de uma variável estática quando a primeira união terminar. - Se o ficheiro não estiver disponível, use a biblioteca de cliente do Cloud Storage para
extrair o ficheiro do contentor do Cloud Storage para o trabalhador local. Uma abordagem recomendada é usar a classe
FileSystems
do Apache Beam para esta tarefa. - Depois de mover o ficheiro, confirme se o bit de execução está definido no ficheiro de código.
- Num sistema de produção, verifique o hash dos ficheiros binários para garantir que o ficheiro foi copiado corretamente.
A utilização da função Apache Beam
filesToStage
também é uma opção, mas remove algumas das vantagens da capacidade do executor de criar pacotes e mover automaticamente o seu código Java. Além disso, uma vez que a chamada para o subprocesso precisa de uma localização absoluta do ficheiro, tem de usar código para determinar o caminho da classe e, por conseguinte, a localização do ficheiro movido pelo filesToStage
. Não recomendamos esta abordagem.
Execute os ficheiros binários externos
Antes de poder executar código externo, tem de criar um wrapper para o mesmo. Escreva este wrapper no mesmo idioma que o código externo (por exemplo, C++) ou como um script de shell. O wrapper permite-lhe transmitir identificadores de ficheiros e implementar otimizações, conforme descrito na secção Conceba o processamento para pequenos ciclos de CPU nesta página. O wrapper não tem de ser sofisticado. O fragmento seguinte mostra um esboço de um wrapper em C++.
int main(int argc, char* argv[])
{
if(argc < 3){
std::cerr << "Required return file and data to process" << '\n';
return 1;
}
std::string returnFile = argv[1];
std::string word = argv[2];
std::ofstream myfile;
myfile.open (returnFile);
myfile << word;
myfile.close();
return 0;
}
Este código lê dois parâmetros da lista de argumentos. O primeiro parâmetro é a localização do ficheiro de retorno para onde os dados são enviados. O segundo parâmetro é o dado que o código envia ao utilizador. Nas implementações do mundo real, este código faria mais do que repetir "Olá, mundo"!
Depois de escrever o código de wrapper, execute o código externo da seguinte forma:
- Transmitir os dados para os binários de código externos.
- Executar os ficheiros binários, detetar erros e registar erros e resultados.
- Tratar as informações de registo.
- Capture dados do processamento concluído.
Transmitir os dados aos ficheiros binários
Para iniciar o processo de execução da biblioteca, transmita dados para o código C++. É neste passo que pode tirar partido da integração do Dataflow com outras ferramentas da Google Cloud Platform. Uma ferramenta como o Bigtable pode lidar com conjuntos de dados muito grandes e processar o acesso de baixa latência e alta simultaneidade, o que permite que milhares de núcleos acedam simultaneamente ao conjunto de dados. Além disso, o Bigtable pode pré-processar dados, o que permite a modelagem, o enriquecimento e a filtragem de dados. Todo este trabalho pode ser feito em transformações do Apache Beam antes de executar o código externo.
Para um sistema de produção, o caminho recomendado é usar um protocol buffer para encapsular os dados de entrada. Pode converter os dados de entrada em bytes e codificá-los em base64 antes de os transmitir à biblioteca externa. As duas formas de transmitir estes dados para a biblioteca externa são as seguintes:
- Dados de entrada pequenos. Para dados pequenos que não excedam o comprimento máximo do sistema para um argumento de comando, transmita o argumento na posição 2 do processo que está a ser criado com
java.lang.ProcessBuilder
. - Dados de entrada grandes. Para tamanhos de dados maiores, crie um ficheiro cujo nome inclua um UUID para conter os dados necessários para o processo.
Executar o código C++, detetar erros e registar
A captura e o processamento de informações de erros são uma parte fundamental do seu pipeline. Os recursos usados pelo executor do Dataflow são temporários e, muitas vezes, é difícil inspecionar os ficheiros de registo dos trabalhadores. Tem de se certificar de que captura e envia todas as informações úteis para o registo do executor do Dataflow e de que armazena os dados de registo num ou mais contentores do Cloud Storage.
A abordagem recomendada é redirecionar stdout
e stderr
para ficheiros, o que
permite evitar considerações de falta de memória. Por exemplo, no
Dataflow Runner que chama o código C++, pode incluir linhas
como as seguintes:
Java
import java.lang.ProcessBuilder.Redirect;
...
processbuilder.redirectError(Redirect.appendTo(errfile));
processbuilder.redirectOutput(Redirect.appendTo(outFile));
Python
# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
integers
| beam.Map(collatz.total_stopping_time).with_exception_handling(
use_subprocess=True))
# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
os.path.splitext(output_path)[0] + '-bad.txt')
Trate as informações de registo
Muitos exemplos de utilização envolvem o processamento de milhões de elementos. O processamento bem-sucedido gera registos com pouco ou nenhum valor, pelo que tem de tomar uma decisão empresarial sobre a retenção dos dados de registo. Por exemplo, considere estas alternativas à retenção de todos os dados de registo:
- Se as informações contidas nos registos do processamento de elementos bem-sucedido não forem valiosas, não as mantenha.
- Crie uma lógica que faça a amostragem dos dados de registo, como a amostragem apenas de cada 10 000 entradas de registo. Se o processamento for homogéneo, como quando muitas iterações do código geram dados de registo essencialmente idênticos, esta abordagem oferece um equilíbrio eficaz entre a retenção de dados de registo e a otimização do processamento.
Para condições de falha, a quantidade de dados transferidos para os registos pode ser grande. Uma estratégia eficaz para processar grandes quantidades de dados de registo de erros consiste em ler as primeiras linhas da entrada do registo e enviar apenas essas linhas para o Cloud Logging. Pode carregar o resto do ficheiro de registo para contentores do Cloud Storage. Esta abordagem permite-lhe analisar as primeiras linhas dos registos de erros mais tarde e, se necessário, consultar o Cloud Storage para ver o ficheiro completo.
Verificar o tamanho do ficheiro de registo também é útil. Se o tamanho do ficheiro for zero, pode ignorá-lo com segurança ou registar uma mensagem de registo simples a indicar que o ficheiro não tinha dados.
Capture dados do processamento concluído
Não recomendamos que use stdout
para transmitir o resultado do cálculo de volta para a função DoFn
. Outro código que o seu código C++ chama, e até o seu próprio código, também pode enviar mensagens para stdout
, poluindo a stream stdoutput
que, de outra forma, contém dados de registo. Em alternativa, é uma prática melhor alterar o código do wrapper C++ para permitir que o código aceite um parâmetro que indique onde criar o ficheiro que armazena o valor. Idealmente, este ficheiro deve ser armazenado de forma independente do idioma através de buffers de protocolo, o que permite que o código C++ transmita um objeto de volta para o código Java ou Python. O objeto DoFn
pode ler o resultado diretamente do ficheiro e transmitir as informações
do resultado para a sua própria chamada output
.
A experiência demonstrou a importância de executar testes unitários relacionados com o próprio processo. É importante implementar um teste de unidade que execute o processo independentemente do pipeline do Dataflow. A depuração da biblioteca pode ser feita de forma muito mais eficiente se for autónoma e não tiver de executar todo o pipeline.
Processamento de design para ciclos de CPU pequenos
Chamar um subprocesso tem sobrecarga. Consoante a sua carga de trabalho, pode ter de fazer trabalho adicional para reduzir a proporção entre o trabalho realizado e os custos administrativos de iniciar e terminar o processo.
No exemplo de utilização de multimédia, o tamanho do elemento de dados determinante pode estar na ordem dos megabytes ou gigabytes. Consequentemente, o processamento de cada elemento de dados pode demorar muitos minutos. Nesse caso, o custo de chamar o subprocesso é insignificante em comparação com o tempo de processamento geral. A melhor abordagem nesta situação é ter um único elemento a iniciar o seu próprio processo.
No entanto, noutros exemplos de utilização, como as finanças, o processamento requer unidades muito pequenas de tempo de CPU (dezenas de milissegundos). Nesse caso, a sobrecarga de chamar o subprocesso é desproporcionadamente grande. Uma solução para este problema é usar a transformação GroupByKey
do Apache Beam para criar lotes de entre 50 e 100 elementos a serem introduzidos no processo. Por exemplo, pode seguir estes passos:
- Numa função
DoFn
, crie um par de chave-valor. Se estiver a processar transações financeiras, pode usar o número da transação como chave. Se não tiver um número exclusivo para usar como chave, pode gerar uma soma de verificação a partir dos dados e usar uma função de módulo para criar partições de 50 elementos. - Envie a chave para uma função
GroupByKey.create
, que devolve uma coleçãoKV<key,Iterable<data>>
que contém os 50 elementos que pode enviar para o processo.
Limite o paralelismo dos trabalhadores
Quando trabalha com um idioma suportado nativamente no executor do Dataflow, nunca tem de pensar no que está a acontecer ao trabalhador. O Dataflow tem muitos processos que supervisionam o controlo de fluxo e as linhas de execução no modo de lote ou de stream.
No entanto, se estiver a usar uma linguagem externa, como C++, tenha em atenção que está a fazer algo um pouco fora do comum ao iniciar subprocessos. No modo de lote, o executor do Dataflow usa uma pequena proporção de threads de trabalho para CPUs em comparação com o modo de streaming. Recomendamos, especialmente no modo de streaming, que crie um semáforo na sua classe para controlar mais diretamente o paralelismo de um trabalhador individual.
Por exemplo, no processamento de multimédia, pode não querer que centenas de elementos de transcodificação sejam processados em paralelo por um único trabalhador. Em casos como esses, pode criar uma classe de utilidade que forneça autorizações à função DoFn
para o trabalho que está a ser realizado. A utilização desta classe permite-lhe assumir o controlo direto
dos threads de trabalho no seu pipeline.
Use destinos de dados de alta capacidade na Google Cloud Platform
Depois de processados, os dados são enviados para um destino de dados. O destino tem de conseguir processar o volume de resultados criados pela sua solução de processamento de grelhas.
O diagrama seguinte mostra algumas das origens disponíveis na Google Cloud Platform quando o Dataflow está a executar uma carga de trabalho de grelha.
O Bigtable, o BigQuery e o Pub/Sub podem processar streams de dados muito grandes. Por exemplo, cada nó do Bigtable pode processar 10 000 inserções por segundo de até 1 KB de tamanho com escalabilidade horizontal fácil. Como resultado, um cluster do Bigtable com 100 nós pode absorver 1 000 000 de mensagens por segundo geradas pela grelha do Dataflow.
Faça a gestão de falhas de segmentação
Quando usa código C++ num pipeline, tem de decidir como gerir as falhas de segmentação, porque têm ramificações não locais se não forem tratadas corretamente. O executor do Dataflow cria processos conforme necessário em Java, Python ou Go e, em seguida, atribui trabalho aos processos sob a forma de pacotes.
Se a chamada ao código C++ for feita através de ferramentas fortemente acopladas, como JNI ou Cython, e o processo C++ falhar, o processo de chamada e a máquina virtual Java (JVM) também falham. Neste cenário, não é possível detetar pontos de dados incorretos. Para tornar os pontos de dados incorretos detetáveis, use uma associação mais flexível, que ramifica os dados incorretos e permite que o pipeline continue. No entanto, com código C++ desenvolvido que seja totalmente testado em todas as variações de dados, pode usar mecanismos como o Cython.
O que se segue?
Siga o tutorial para criar um pipeline que use contentores personalizados com bibliotecas C++.
Veja o código de exemplo desta página no repositório do GitHub do Apache Beam.
Saiba mais sobre como criar pipelines com o Apache Beam.