Implantar um pipeline

Neste documento, explicamos em detalhes como o Dataflow implanta e executa um pipeline e abrange tópicos avançados, como otimização e balanceamento de carga. Se você estiver procurando um guia passo a passo sobre como criar e implantar seu primeiro pipeline, use os guias de início rápido do Dataflow para Java , Python ou modelos.

Depois de construir e testar o pipeline do Apache Beam, é possível usar o serviço gerenciado do Dataflow para implantá-lo e executá-lo. No serviço do Dataflow, o código do pipeline torna-se um job do Dataflow.

O serviço do Dataflow gerencia totalmente serviços do Google Cloud como o Compute Engine e o Cloud Storage para executar um job do Dataflow, ativando e eliminando automaticamente os recursos necessários. Com o serviço do Dataflow, é possível visualizar o job com as ferramentas Interface de monitoramento do Dataflow e Interface de linha de comando do Dataflow.

Definir parâmetros de execução no código do pipeline permite controlar alguns aspectos de como o serviço do Dataflow executa o job. Por exemplo, os parâmetros de execução especificam se as etapas do pipeline são executadas em máquinas virtuais de worker, no back-end do serviço Dataflow ou localmente.

Além de gerenciar recursos do Google Cloud, o serviço do Dataflow executa e otimiza automaticamente muitos aspectos do processamento paralelo distribuído. São eles:

  • Carregamento em paralelo e distribuição. O Dataflow particiona automaticamente os dados e distribui o código do trabalhador nas instâncias do Compute Engine para processamento paralelo.
  • Otimização. O Dataflow usa o código do pipeline para criar um gráfico de execução que representa as PCollections e transformações, e otimiza esse gráfico para ter o desempenho e o uso de recursos mais eficientes. Além disso, o Dataflow otimiza automaticamente as operações de custo potencialmente alto, como agregações de dados.
  • Recursos de ajuste automático. O serviço Dataflow inclui vários recursos que fornecem ajuste rápido de alocação de recursos e particionamento de dados, como escalonamento automático e reequilíbrio dinâmico de trabalho. Esses recursos ajudam o serviço do Dataflow a executar o job da forma mais rápida e eficiente possível.

Ciclo de vida do pipeline: do código do pipeline ao job do Dataflow

Quando você executa o pipeline do Dataflow, o Dataflow cria um gráfico de execução a partir do código que constrói o objeto Pipeline, incluindo todas as transformações e as respectivas funções de processamento, como DoFn. Essa fase é chamada de tempo de construção do gráfico e é executada localmente no computador em que o pipeline é executado.

Durante a construção do gráfico, o Apache Beam executa localmente o código a partir do ponto de entrada principal do código do pipeline, interrompendo as chamadas para uma etapa de origem, coletor ou transformação e transformando essas chamadas em nós do gráfico. Como consequência, uma parte do código em um ponto de entrada do pipeline (método main() do Java ou o nível superior de um script Python) é executada localmente na máquina que executa o pipeline, enquanto o mesmo código declarado em um método de um objeto DoFn é executado nos workers do Dataflow.

Durante a construção do gráfico, o Apache Beam também valida que os recursos referenciados pelo pipeline (como buckets do Cloud Storage, tabelas do BigQuery e tópicos ou assinaturas do Pub/Sub) realmente existem e são acessíveis. A validação é feita por meio de chamadas de API padrão para os respectivos serviços. Portanto, é essencial que a conta de usuário usada para executar um pipeline tenha conectividade adequada com os serviços necessários e esteja autorizada a chamar as APIs. Antes de enviar o pipeline para o serviço Dataflow, o Apache Beam também verifica se há outros erros e garante que o gráfico do pipeline não contenha operações ilegais.

O gráfico de execução é convertido para o formato JSON e transmitido para o endpoint do serviço do Dataflow.

Observação: a construção do gráfico também acontece ao executar o pipeline no local, mas o gráfico não é convertido em JSON nem transmitido para o serviço. Em vez disso, é executado localmente na mesma máquina na qual o programa Dataflow foi iniciado. Para mais detalhes, consulte a documentação sobre configuração para execução local.

Em seguida, o serviço do Dataflow valida o gráfico de execução em JSON. Quando o gráfico é validado, torna-se um job no serviço do Dataflow. Use a Interface de monitoramento do Dataflow para ver o job, o gráfico de execução, o status e as informações de registro.

Java: SDK 2.x

O serviço Dataflow envia uma resposta para a máquina que executa o programa do Dataflow. Essa resposta é encapsulada no objeto DataflowPipelineJob, que contém o jobId do job do Dataflow. Use o jobId para monitorar, rastrear e solucionar problemas do job com a Interface de monitoramento do Dataflow e a Interface de linha de comando do Dataflow. Consulte a referência da API de DataflowPipelineJob para mais informações.

Python

O serviço Dataflow envia uma resposta para a máquina que executa o programa do Dataflow. Essa resposta é encapsulada no objeto DataflowPipelineResult, que contém o job_id do job do Dataflow. Use o job_id para monitorar, rastrear e solucionar problemas do job com a Interface de monitoramento do Dataflow e a Interface de linha de comando Dataflow.

Java: SDK 1.x

Gráfico de execução

O Dataflow cria um gráfico de etapas que representa o pipeline, com base nas transformações e nos dados usados na construção do objeto Pipeline. Esse é o gráfico de execução do pipeline.

O exemplo de WordCount incluído nos SDKs do Apache Beam contém uma série de transformações para ler, extrair, contar, formatar e gravar palavras específicas em um conjunto de textos, junto com uma contagem de ocorrências de cada palavra. O seguinte diagrama mostra como as transformações do pipeline WordCount são expandidas em um gráfico de execução:

As transformações no programa de exemplo WordCount são expandidas em um gráfico de execução
              de etapas a serem executadas pelo serviço Dataflow.
Figura 1: gráfico de execução do exemplo WordCount

O gráfico de execução geralmente difere da ordem em que as transformações foram especificadas quando o pipeline foi construído. Isso ocorre porque o serviço Dataflow faz várias otimizações e fusões no gráfico de execução antes de executar recursos de nuvem gerenciados. O serviço Dataflow respeita as dependências de dados na execução do pipeline. No entanto, as etapas sem dependências de dados entre si podem ser executadas em qualquer ordem.

Selecione o job na Interface de monitoramento do Dataflow para ver o gráfico de execução não otimizado que o Dataflow gerou para o pipeline.

Carregamento em paralelo e distribuição

O serviço do Dataflow carrega em paralelo e distribui automaticamente a lógica de processamento no pipeline para os workers que você alocou para executar o job. O Dataflow usa as abstrações do modelo de programação para representar funções de processamento paralelas. Por exemplo, as transformações ParDo fazem o Dataflow distribuir automaticamente o código de processamento (representado pelas DoFns) para que vários workers sejam executados em paralelo.

Como estruturar o código de usuário

Pense no código DoFn como pequenas entidades independentes: várias instâncias podem estar em execução em diferentes máquinas, sem que haja conhecimento umas das outras. Assim, as funções puras, aquelas que não dependem do estado oculto ou externo, são deterministas e não apresentam efeitos colaterais observáveis, são o código ideal para a natureza paralela e distribuída de DoFns.

O modelo de função pura não é estritamente rígido. No entanto, as informações de estado ou os dados de inicialização externos podem ser válidos para DoFn e outros objetos de função, desde que o código não dependa de coisas que o serviço do Dataflow não garante. Ao estruturar transformações ParDo e criar DoFns, lembre-se das seguintes diretrizes:

  • O Dataflow garante que todos os elementos da PCollection de entrada sejam processados por uma instância de DoFn apenas uma vez.
  • O serviço do Dataflow não garante a quantidade de vezes que um DoFn será invocado.
  • O serviço Dataflow não garante exatamente como os elementos distribuídos são agrupados, ou seja, não garante quais elementos, se houver, serão processados juntos.
  • O serviço do Dataflow não garante o número exato de instâncias de DoFn que serão criadas no decurso de um pipeline.
  • O serviço do Dataflow é tolerante a falhas e pode tentar novamente o código várias vezes em caso de problemas no worker. Além disso, o serviço do Dataflow pode criar cópias de backup do código e ter problemas com efeitos colaterais manuais, por exemplo, se o código depender de arquivos temporários ou criá-los com nomes não exclusivos.
  • O serviço do Dataflow serializa o processamento de elemento por instância de DoFn. Embora o código não precise ser estritamente seguro para linhas de execução, os estados compartilhados entre várias instâncias de DoFn precisam ser.

Consulte os requisitos para funções fornecidas pelo usuário (em inglês) na documentação do modelo de programação para mais informações sobre como criar o código de usuário.

Tratamento de erros e exceções

O pipeline pode lançar exceções durante o processamento de dados. Alguns desses erros são transitórios, por exemplo, dificuldade temporária em acessar um serviço externo, mas outros são permanentes, como erros causados pela entrada de dados corruptos ou não analisáveis, ou ponteiros nulos durante a computação.

O Dataflow processa elementos em pacotes arbitrários e repete o pacote completo quando um erro é gerado para qualquer elemento nesse pacote. Na execução no modo de lote, os pacotes que incluem um item com falha são repetidos quatro vezes. A falha do pipeline será total quando um único pacote falhar quatro vezes. Em execuções no modo de streaming, um pacote que inclui um item com falha é repetido indefinidamente, o que pode causar a parada permanente do pipeline.

Otimização de fusão

Depois que o formato JSON do gráfico de execução do pipeline é validado, o serviço do Dataflow pode modificar o gráfico para fazer otimizações. Essas otimizações podem incluir a fusão de várias etapas ou transformações do gráfico de execução do canal em etapas únicas. A fusão de etapas evita que o serviço do Dataflow precise materializar cada PCollection intermediária no pipeline, o que pode custar caro em termos de memória e processamento.

Todas as transformações especificadas na construção do pipeline são executadas no serviço, mas elas podem ser executadas em uma ordem diferente ou como parte de uma transformação fusionada maior para garantir a execução mais eficiente do pipeline. O serviço do Dataflow respeita as dependências de dados entre as etapas no gráfico de execução. No entanto, em outras situações, as etapas podem ser executadas em qualquer ordem.

Exemplo de fusão

O diagrama a seguir mostra como o gráfico de execução do exemplo WordCount incluído no SDK do Apache Beam para Java pode ser otimizado e fusionado pelo serviço do Dataflow para uma execução eficiente:

O gráfico de execução do programa de exemplo WordCount otimizado e com etapas mescladas
              pelo serviço Dataflow.
Figura 2: gráfico de execução otimizado do exemplo WordCount

Como evitar a fusão

Em alguns casos no pipeline, é recomendável evitar que o serviço do Dataflow faça otimizações de fusão. São casos em que o serviço do Dataflow deduz incorretamente o meio ideal de fusionar operações no pipeline, limitando a capacidade do serviço do Dataflow de usar todos os workers disponíveis.

Por exemplo, um caso em que a fusão pode limitar a capacidade do Dataflow de otimizar o uso de workers é em uma ParDo de "alta distribuição de dados". Nessa operação, pode haver uma coleção de entrada com um número relativamente baixo de elementos, mas a ParDo produz uma saída com centenas ou milhares de elementos a mais, seguida por outra ParDo. Se o serviço do Dataflow fusionar essas operações ParDo em conjunto, o paralelismo nesta etapa será limitado a, no máximo, o número de itens da coleção de entrada, mesmo que a PCollection intermediária contenha muitos outros elementos.

Para evitar essa fusão, inclua no pipeline uma operação que obrigue o serviço do Dataflow a materializar a PCollection intermediária. Use uma das seguintes operações:

  • Insira uma GroupByKey e desagrupe depois da primeira ParDo. O serviço do Dataflow nunca faz a fusão de operações ParDo em uma agregação.
  • Transmita a PCollection intermediária como uma entrada secundária (em inglês) para outra ParDo. O serviço do Dataflow sempre materializa entradas secundárias.
  • Você pode inserir uma etapa Reshuffle. Reshuffle evita a fusão, verifica os dados e executa a eliminação de duplicação de registros. A reorganização é compatível com o Dataflow, mesmo que esteja marcada como obsoleta na documentação do Apache Beam.

Otimização de combinação

As operações de agregação são um conceito importante no processamento de dados em grande escala. A agregação reúne dados que são muito diferentes em termos de conceito, tornando-a extremamente útil para a correlação. O modelo de programação do Dataflow representa as operações de agregação como as transformações GroupByKey, CoGroupByKey e Combine.

As operações de agregação do Dataflow combinam dados de todo o conjunto de dados, inclusive os que podem ser distribuídos para vários workers. Durante essas operações, geralmente é mais eficiente combinar localmente o máximo possível de dados antes de combiná-los entre instâncias. Quando você aplica uma GroupByKey ou outra transformação de agregação, o serviço do Dataflow faz automaticamente a combinação parcial local antes da operação de agrupamento principal.

Na combinação parcial ou de vários níveis, o serviço Dataflow se orienta de acordo com o modo de execução do pipeline, ou seja, lote ou streaming. Para os dados limitados, o serviço favorece a eficiência e faz o máximo possível de combinações locais. Para dados ilimitados, o serviço favorece a latência mais baixa e não faz combinações parciais, já que isso eleva a latência.

Recurso de ajuste automático

O serviço Dataflow contém vários recursos de ajuste automático que podem otimizar de modo ainda mais dinâmico o job do Dataflow durante a execução. Esses recursos incluem o Escalonamento automático e o Reequilíbrio dinâmico de trabalho (links em inglês).

Escalonamento automático

Com o escalonamento automático ativado, o serviço Dataflow seleciona automaticamente o número apropriado de instâncias de trabalhador necessárias para executar o job. O serviço Dataflow pode também realocar dinamicamente mais ou menos trabalhadores durante o tempo de execução de acordo com as características do job. Algumas partes do pipeline podem ser mais pesadas do que outras em termos computacionais, e o serviço do Dataflow pode ativar automaticamente mais workers durante essas fases do job e encerrá-los quando não forem mais necessários.

Java: SDK 2.x

O escalonamento automático é ativado por padrão em todos os batches de jobs do Dataflow e de streaming usando o Streaming Engine. Para desativar o escalonamento automático, especifique a sinalização --autoscalingAlgorithm=NONE quando executar o pipeline. Nesse caso, observe que o serviço do Dataflow define o número de workers com base na opção --numWorkers, que é 3, por padrão.

Com o escalonamento automático ativado, o serviço do Dataflow não permite que o usuário tenha controle do número exato de instâncias de worker alocadas para o job. Também é possível limitar o número de workers. Para isso, especifique a opção --maxNumWorkers ao executar o pipeline.

Para jobs em lote, a sinalização --maxNumWorkers é opcional. O padrão é 1000. Para jobs de streaming, usando o Streaming Engine, a sinalização --maxNumWorkers é opcional. O padrão é 100. Para jobs de streaming que não usam o Streaming Engine, a sinalização --maxNumWorkers é obrigatória.

Python

O escalonamento automático é ativado por padrão em todos os jobs em lote do Dataflow criados com a versão 0.5.1 ou superior do SDK do Apache Beam para Python. Para desativar o escalonamento automático, especifique explicitamente a sinalização --autoscaling_algorithm=NONE quando executar o pipeline. Nesse caso, observe que o serviço do Dataflow define o número de workers com base na opção --num_workers, que é 3, por padrão.

Java: SDK 1.x

O Dataflow é escalonável com base no paralelismo de um pipeline. O paralelismo de um pipeline é uma estimativa do número de linhas de execução necessárias para processar os dados com mais eficiência em um determinado momento.

Escalonamento automático em lote

Para os pipelines em lote, o Dataflow escolhe automaticamente o número de workers com base na quantidade total estimada de trabalho em cada estágio do pipeline, que depende do tamanho da entrada e da capacidade atual. O Dataflow reavalia a quantidade de trabalho de acordo com o progresso da execução a cada 30 segundos, além de aumentar ou diminuir dinamicamente o número de workers à medida que a quantidade total de trabalho estimada aumenta ou diminui.

Se alguma das seguintes condições ocorrer, para salvar recursos inativos, o Dataflow manterá ou diminuirá o número de workers:

  • O uso médio de CPU do worker é menor que 5%.
  • O paralelismo é limitado devido a trabalhos não paralelos, como dados não divisíveis, como arquivos compactados ou dados processados por módulos de E/S que não são divididos.
  • O número de paralelismo é fixo, como gravação em arquivos atuais em um destino do Cloud Storage.
  • Se o pipeline usa uma origem de dados personalizada implementada por você, alguns métodos podem ser implementados para fornecer mais informações ao algoritmo de escalonamento automático do serviço do Dataflow e melhorar potencialmente o desempenho:

    Java: SDK 2.x

    • Na subclasse BoundedSource, implemente o método getEstimatedSizeBytes. O serviço do Dataflow usa getEstimatedSizeBytes ao calcular o número inicial de workers que serão usados no pipeline.
    • Na subclasse BoundedReader, implemente o método getFractionConsumed. O serviço do Dataflow usa getFractionConsumed para acompanhar o progresso de leitura e convergir para o número correto de workers que serão usados durante uma leitura.

    Python

    • Na subclasse BoundedSource, implemente o método estimate_size. O serviço do Dataflow usa estimate_size ao calcular o número inicial de workers que serão usados no pipeline.
    • Na subclasse RangeTracker, implemente o método fraction_consumed. O serviço do Dataflow usa fraction_consumed para acompanhar o progresso de leitura e convergir para o número correto de workers que serão usados durante uma leitura.

    Java: SDK 1.x

    Escalonamento automático de streaming

    Com o escalonamento automático de streaming, o serviço do Dataflow altera de maneira adaptável o número de workers usados para executar o pipeline de streaming em resposta às mudanças na utilização de recursos e de carga. O escalonamento automático de streaming é oferecido gratuitamente e projetado para reduzir os custos dos recursos usados na execução de pipelines de streaming.

  • Escalonamento vertical: se um pipeline de streaming permanecer em backlog com os workers utilizando, em média, mais de 20% das CPUs por alguns minutos, o Dataflow será escalonado verticalmente. Os destinos do Dataflow limpam o backlog em aproximadamente 150 segundos após o escalonamento vertical, dada a capacidade atual por worker.
  • Redução de escalonamento vertical: se um backlog de pipeline de streaming for menor que 10 segundos e os workers estiverem utilizando, em média, menos de 75% das CPUs por um período de alguns minutos, o Dataflow reduzirá o escalonamento vertical. Após a redução, os workers utilizam, em média, 75% das CPUs. Em jobs de streaming que não usam o Streaming Engine, às vezes a utilização de 75% da CPU não pode ser alcançada devido à distribuição do disco e um destino mais baixo é usado.
  • Sem escalonamento: se não houver backlog, mas o uso da CPU for 75% ou superior, o pipeline não reduz o escalonamento vertical. Se houver backlog, mas o uso da CPU for inferior a 20%, o pipeline não escalonará verticalmente.
  • Quando o escalonamento automático não é usado, é possível escolher um número fixo de workers especificando numWorkers ou num_workers para executar o pipeline. Como a carga de trabalho de entrada varia com o passar do tempo, esse número pode ficar muito alto ou muito baixo. O provisionamento de workers em excesso resulta em custo adicional desnecessário, e o contrário resulta em latência mais alta para os dados processados. Com a ativação do escalonamento automático, os recursos são usados somente quando necessário.

    O objetivo do escalonamento automático de pipelines de streaming é minimizar o backlog e maximizar a capacidade e utilização de workers, além de reagir rapidamente aos picos de carga. Com o escalonamento automático ativado, você não precisa escolher entre provisionamento para carga de pico e atualização de resultados. Os workers são adicionados conforme a utilização de CPU e o backlog aumentam e são removidos quando essas métricas caem. Dessa forma, você paga apenas pelo que for necessário, e o job é processado com a máxima eficiência possível.

    Java: SDK 2.x

    Fontes personalizadas ilimitadas

    Se o pipeline usa uma fonte personalizada ilimitada, essa fonte precisa informar o serviço do Dataflow sobre o backlog. O backlog é uma estimativa da entrada em bytes que ainda não foi processada pela fonte. Para informar o serviço sobre o backlog, implemente um dos métodos a seguir na classe UnboundedReader.

    • getSplitBacklogBytes(): backlog para a divisão atual da fonte. O serviço agrega o backlog em todas as divisões.
    • getTotalBacklogBytes(): backlog global em todas as divisões. Em alguns casos, o backlog não está disponível para cada divisão e só pode ser calculado em todas elas. Somente a primeira divisão (com ID "0") precisa fornecer o backlog total.
    O repositório do Apache Beam contém vários exemplos (em inglês) de fontes personalizadas que implementam a classe UnboundedReader.
    Ativar o escalonamento automático de streaming

    Para jobs de streaming que usam o Streaming Engine, o escalonamento automático é ativado por padrão.

    Para ativar o escalonamento automático de jobs que não usam o Streaming Engine, defina os seguintes parâmetros de execução ao iniciar o pipeline:

    --autoscalingAlgorithm=THROUGHPUT_BASED
    --maxNumWorkers=N
    

    Para jobs de streaming que não usam o Streaming Engine, o número mínimo de workers é 1/15 do valor --maxNumWorkers, arredondado.

    Os pipelines de streaming são implantados com um pool fixo de discos permanentes em mesmo número de --maxNumWorkers. Leve isso em consideração quando especificar --maxNumWorkers e verifique se esse valor é um número suficiente de discos para o pipeline.

    Uso e preços

    O cálculo da utilização do Compute Engine é baseado no número médio de workers e da utilização de discos permanentes, no valor exato de --maxNumWorkers. Esses discos são redistribuídos de modo que cada worker tenha um número igual de discos anexados.

    No exemplo acima, em que --maxNumWorkers=15, seria cobrado o preço de 1 a 15 instâncias do Compute Engine e de exatamente 15 discos permanentes.

    Python

    Ativar o escalonamento automático de streaming

    Para ativar o escalonamento automático, defina os seguintes parâmetros de execução quando iniciar o pipeline:

    --autoscaling_algorithm=THROUGHPUT_BASED
    --max_num_workers=N
    

    Para jobs de streaming que não usam o Streaming Engine, o número mínimo de workers é 1/15 do valor --maxNumWorkers, arredondado.

    Os pipelines de streaming são implantados com um pool fixo de discos permanentes em mesmo número de --maxNumWorkers. Leve isso em consideração quando especificar --maxNumWorkers e verifique se esse valor é um número suficiente de discos para o pipeline.

    Uso e preços

    O cálculo da utilização do Compute Engine é baseado no número médio de workers e da utilização de discos permanentes, no valor exato de --max_num_workers. Esses discos são redistribuídos de modo que cada worker tenha um número igual de discos anexados.

    No exemplo acima, em que --max_num_workers=15, seria cobrado o preço de 1 a 15 instâncias do Compute Engine e de exatamente 15 discos permanentes.

    Java: SDK 1.x

    Como escalonar manualmente um pipeline de streaming

    Até o escalonamento automático ficar disponível no modo de streaming, escalone manualmente o número de trabalhadores que executam o canal de streaming usando o recurso Atualizar do Dataflow.

    Java: SDK 2.x

    Para escalonar o pipeline de streaming durante a execução, defina os parâmetros de execução a seguir ao iniciar o pipeline:

    • Defina --maxNumWorkers como o número máximo de workers que você quer disponibilizar para o pipeline.
    • Defina --numWorkers como o número inicial de workers que você quer que o pipeline use quando começar a ser executado.

    Atualize o pipeline quando ele já estiver em execução e especifique um novo número de workers usando o parâmetro --numWorkers. É necessário que o novo valor de --numWorkers esteja entre N e --maxNumWorkers, em que N é igual a --maxNumWorkers/15.

    A atualização substitui o job em execução por um novo, usando o novo número de workers, mas preservando todas as informações de estado associadas ao job anterior.

    Python

    Para escalonar o pipeline de streaming durante a execução, defina os parâmetros de execução a seguir ao iniciar o pipeline:

    • Defina --max_num_workers como o número máximo de workers que você quer disponibilizar para o pipeline.
    • Defina --num_workers como o número inicial de workers que você quer que o pipeline use quando começar a ser executado.

    Atualize o pipeline quando ele já estiver em execução e especifique um novo número de workers usando o parâmetro --num_workers. É necessário que o novo valor de --num_workers esteja entre N e --max_num_workers, em que N é igual a --max_num_workers/15.

    A atualização substitui o job em execução por um novo, usando o novo número de workers, mas preservando todas as informações de estado associadas ao job anterior.

    Java: SDK 1.x

    Reequilíbrio de trabalho dinâmico

    Com o recurso Reequilíbrio dinâmico de trabalho do serviço Dataflow, o serviço reparticiona dinamicamente o trabalho baseado nas condições do tempo de execução. Essas condições incluem:

    • Desequilíbrio nas atribuições de trabalho
    • Workers levam mais tempo do que o estimado para finalizar
    • os trabalhadores finalizam antes do tempo estimado

    O serviço Dataflow detecta automaticamente essas condições e reatribui o trabalho para trabalhadores subutilizados ou não utilizados a fim de diminuir o tempo de processamento geral do job.

    Limitações

    O reequilíbrio dinâmico de trabalho acontece apenas quando o serviço do Dataflow está processando dados de entrada em paralelo: ao ler dados de uma fonte de entrada externa, trabalhar com uma PCollection intermediária materializada ou trabalhar com o resultado de uma operação de agregação, como GroupByKey. Se ocorrer a fusão de um grande número de etapas do job, haverá menos PCollections no job e o Reequilíbrio dinâmico de trabalho será limitado ao número de elementos na PCollection materializada da fonte. Para garantir que o Reequilíbrio dinâmico de trabalho seja aplicado a uma determinada PCollection no pipeline, há algumas maneiras diferentes de evitar a fusão para garantir o carregamento em paralelo dinâmico.

    O reequilíbrio dinâmico de trabalho não pode recarregar em paralelo dados mais refinados do que um único registro. Se os dados contêm registros individuais que causam grandes atrasos no tempo de processamento, podem ainda atrasar o job, já que o Dataflow não pode subdividir e redistribuir um registro individual "ativo" para vários workers.

    Java: SDK 2.x

    Se você definiu um número fixo de fragmentos para a saída final do pipeline (por exemplo, efetuando a gravação de dados usando TextIO.Write.withNumShards), o carregamento em paralelo será limitado com base no número de fragmentos escolhidos.

    Python

    Se você definiu um número fixo de fragmentos para a saída final do pipeline (por exemplo, efetuando a gravação de dados usando beam.io.WriteToText(..., num_shards=...)), o carregamento em paralelo será limitado pelo Dataflow com base no número de fragmentos escolhidos.

    Java: SDK 1.x

    A limitação de fragmentos fixos pode ser considerada temporária e estar sujeita a alterações nas futuras versões do serviço Dataflow.

    Como trabalhar com fontes de dados personalizadas

    Java: SDK 2.x

    Se o pipeline usar uma fonte de dados personalizada fornecida por você, implemente o método splitAtFraction para que ela funcione com o recurso Reequilíbrio dinâmico de trabalho.

    Python

    Se o pipeline usar uma fonte de dados personalizada fornecida por você, seu RangeTracker precisa implementar try_claim, try_split, position_at_fraction e fraction_consumed para que ela funcione com o recurso Reequilíbrio dinâmico de trabalho.

    Consulte Informações de referência da API sobre RangeTracker para saber mais.

    Java: SDK 1.x

    Gerenciamento e uso de recursos

    O serviço do Dataflow gerencia totalmente os recursos do Google Cloud por job. Isso inclui ativar e encerrar instâncias do Compute Engine, às vezes chamadas deworkers ou VMs, e acessar buckets no Cloud Storage do seu projeto para E/S e preparo de arquivos temporários. No entanto, se o pipeline interagir com as tecnologias de armazenamento de dados do Google Cloud, como o BigQuery e o Pub/Sub, será necessário gerenciar os recursos e a cota desses serviços.

    O Dataflow usa um local fornecido pelo usuário no Cloud Storage especificamente para o preparo de arquivos. Esse local está sob seu controle. É preciso garantir que o tempo de vida do local dure enquanto os jobs fazem leitura nele. O mesmo local de preparação pode ser reutilizado em execuções de vários jobs, pois o armazenamento em cache integrado do SDK pode acelerar o tempo de início dos jobs.

    Vagas

    É possível executar até 25 jobs simultâneos do Dataflow por projeto do Google Cloud. No entanto, esse limite pode ser aumentado entrando em contato com o suporte do Google Cloud. Para mais informações, consulte Cotas.

    No momento, o serviço do Dataflow está limitado a processar solicitações de jobs JSON com 20 MB ou menos. O tamanho da solicitação de job está expressamente vinculado à representação JSON do pipeline. Um pipeline maior aceita solicitações maiores.

    Para estimar o tamanho da solicitação JSON do pipeline, execute-o com a seguinte opção:

    Java: SDK 2.x

    --dataflowJobFile=< path to output file >
    

    Python

    --dataflow_job_file=< path to output file >
    

    Java: SDK 1.x

    Esse comando grava uma representação JSON do job em um arquivo. O tamanho do arquivo serializado é uma boa estimativa do tamanho da solicitação. O tamanho real é ligeiramente maior em função de algumas informações adicionais incluídas na solicitação.

    Veja mais informações na página de solução de problemas "413 Entidade de solicitação muito grande" / "O tamanho da representação JSON serializada do pipeline excede o limite permitido".

    Além disso, o tamanho do gráfico do job precisa ter menos de 10 MB. Para mais informações, consulte a página da solução do problema "O gráfico do job é muito grande. Tente novamente com um gráfico menor ou divida o job em dois ou mais jobs menores.".

    Workers

    No momento, o serviço do Dataflow comporta, no máximo, 1.000 instâncias do Compute Engine por job. Para jobs em lote, o tipo de máquina padrão é n1-standard-1. Para jobs de streaming, o tipo de máquina padrão para jobs de Streaming Engine-enabled é n1-standard-2 e o tipo de máquina padrão para jobs non-Streaming Engine é n1-standard-4. Portanto, quando você usa os tipos de máquina padrão, o serviço Dataflow aloca até 4.000 núcleos por job. Se você precisar de mais núcleos para o job, poderá selecionar um tipo de máquina maior.

    Você pode usar qualquer uma das famílias de tipos de máquina do Compute Engine disponíveis, bem como tipos de máquinas personalizadas. Para melhores resultados, use tipos de máquina n1. Os tipos de máquina com núcleo compartilhado, como workers da série f1 e g1, não recebem suporte de acordo com o Contrato de nível de serviço do Dataflow.

    O Dataflow cobra pelo número de vCPUs e GB de memória nos workers. O faturamento não depende da família de tipos de máquinas. Para especificar um tipo de máquina para o pipeline, defina o parâmetro de execução apropriado no momento da criação do pipeline.

    Java: SDK 2.x

    Para alterar o tipo de máquina, defina a opção --workerMachineType.

    Python

    Para alterar o tipo de máquina, defina a opção --worker_machine_type.

    Java: SDK 1.x

    Cota de recursos

    O serviço do Dataflow faz verificações para garantir que o projeto do Google Cloud tenha a cota de recursos do Compute Engine necessária para executar o job, tanto para iniciar quanto para escalonar o job para o número máximo de instâncias de worker. O job não será iniciado se não houver cota de recursos suficiente.

    O recurso de Escalonamento automático é limitado pela cota disponível do Compute Engine do projeto. Se o job tiver cota suficiente quando for iniciado e um outro job usar o restante da cota disponível do projeto, o primeiro job será executado, mas não será totalmente escalonado.

    No entanto, o serviço Dataflow não gerencia aumentos de cota dos jobs que excedem as cotas de recursos do projeto. Todas as solicitações necessárias para a cota de recursos adicionais ficam sob a responsabilidade do usuário. Para realizar essa tarefa, é possível usar o Console do Google Cloud.

    Recursos de disco permanente

    O serviço do Dataflow está limitado atualmente a 15 discos permanentes por instância de worker na execução de um job de streaming. Cada disco permanente é local a uma máquina virtual individual do Compute Engine. O job não pode ter mais workers do que discos permanentes. A proporção de 1:1 entre workers e discos é a cota mínima de recursos.

    Para jobs em execução em VMs de worker, o tamanho padrão de cada disco permanente é 250 GB no modo em lote e 400 GB no modo de streaming. Os jobs que usam o Streaming Engine ou o Dataflow Shuffle são executados no back-end do serviço do Dataflow e usam discos menores.

    Locais

    Por padrão, o serviço do Dataflow implanta recursos do Compute Engine na zona us-central1-f da região us-central1. Para modificar essa configuração, é necessário especificar o parâmetro --region. Se você precisar usar recursos em uma zona específica, utilize o parâmetro --zone ao criar o pipeline. No entanto, recomendamos especificar apenas a região e não definir a zona. Assim, o serviço do Dataflow seleciona automaticamente a melhor zona dentro da região, com base na capacidade disponível no momento da solicitação de criação do job. Para mais informações, consulte a documentação sobre endpoints regionais.

    Streaming Engine

    Atualmente, o sistema de execução de pipelines do Dataflow executa as etapas do pipeline de streaming inteiramente em máquinas virtuais de worker e consome CPU, memória e armazenamento de disco permanente do worker. O Streaming Engine do Dataflow move a execução do pipeline das VMs de worker para o back-end do serviço do Dataflow.

    Benefícios do Streaming Engine

    O modelo do Streaming Engine tem os seguintes benefícios:

    • Redução nos recursos consumidos de CPU, memória e armazenamento de disco permanente nas VMs de worker. O Streaming Engine funciona melhor com tipos de máquina de worker menores (n1-standard-2, em vez de n1-standard-4) e não requer discos permanentes a mais além de um disco pequeno para inicialização do worker, o que reduz o consumo de recursos e cotas.
    • Escalonamento automático com mais resposta a variações no volume de dados recebidos. O Streaming Engine oferece uma transição de escalonamento mais suave e granular dos workers.
    • Melhor suporte, já que você não precisa reimplantar seus pipelines para aplicar atualizações de serviço.

    A maior parte da redução nos recursos do worker é resultado da transferência do trabalho para o serviço do Dataflow. Por esse motivo, usar o Streaming Engine gera uma cobrança. No entanto, espera-se que o faturamento total dos pipelines do Dataflow que usam o Streaming Engine seja quase igual ao daqueles que não usam essa opção.

    Como usar o Streaming Engine

    O Streaming Engine está disponível para streaming de pipelines nas regiões abaixo. No futuro, ele estará disponível em mais regiões.

    • asia-east1 (Taiwan)
    • asia-east2 (Hong Kong)
    • asia-northeast1 (Tóquio)
    • asia-northeast2 (Osaka)
    • asia-northeast3 (Seul)
    • asia-south1 (Mumbai)
    • asia-southeast1 (Singapura)
    • asia-southeast2 (Jacarta)
    • australia-southeast1 (Sydney)
    • europe-north1 (Finlândia)
    • europe-west1 (Bélgica)
    • europe-west2 (Londres)
    • europe-west3 (Frankfurt)
    • europe-west4 (Países Baixos)
    • europe-west6 (Zurique)
    • northamerica-northeast1 (Montreal)
    • southamerica-east1 (São Paulo)
    • us-east1 (Carolina do Sul)
    • us-east4 (Norte da Virgínia)
    • us-central1 (Iowa)
    • us-west1 (Oregon)
    • us-west2 (Los Angeles)
    • us-west3 (Salt Lake City)
    • us-west4 (Las Vegas)

    Java: SDK 2.x

    Para usar o Streaming Engine em pipelines de streaming, especifique o seguinte parâmetro:

    • --enableStreamingEngine, se você estiver usando a versão 2.11.0 ou superior do SDK do Apache Beam para Java.
    • --experiments=enable_streaming_engine, se você estiver usando a versão 2.10.0 do SDK do Apache Beam para Java.

    Se você usar o Dataflow Streaming Engine no pipeline, não especifique o parâmetro --zone. Em vez disso, especifique o parâmetro --region e defina o valor como uma das regiões em que o Streaming Engine está disponível no momento. O Dataflow selecionará automaticamente a zona na região que você especificou. O Dataflow apresentará um erro se você especificar o parâmetro --zone e defini-lo como uma zona fora das regiões disponíveis.

    O Streaming Engine funciona melhor com tipos de máquinas de worker menores. Por isso, recomendamos que você defina --workerMachineType=n1-standard-2. Também é possível definir --diskSizeGb=30 porque o Streaming Engine precisa de espaço apenas para a imagem de inicialização do worker e os registros locais. Esses valores são padrão.

    Python

    O Streaming Engine é ativado por padrão para novos pipelines de streaming do Dataflow quando as seguintes condições são atendidas:

    Se quiser desativar o Streaming Engine no pipeline de streaming do Python, especifique o parâmetro a seguir:

    --experiments=disable_streaming_engine

    Se você usar o Python 2, ainda precisará ativar o Streaming Engine especificando o seguinte parâmetro:

    --enable_streaming_engine

    Se você usar o Dataflow Streaming Engine no seu pipeline, não especifique o parâmetro --zone. Em vez disso, especifique o parâmetro --region e defina o valor como uma das regiões em que o Streaming Engine está disponível no momento. O Dataflow selecionará automaticamente a zona na região que você especificou. O Dataflow apresentará um erro se você especificar o parâmetro --zone e defini-lo como uma zona fora das regiões disponíveis.

    O Streaming Engine funciona melhor com tipos de máquinas de worker menores. Por isso, recomendamos que você defina --machine_type=n1-standard-2. Também é possível definir --disk_size_gb=30 porque o Streaming Engine precisa de espaço apenas para a imagem de inicialização do worker e os registros locais. Esses valores são padrão.

    Java: SDK 1.x

    Dataflow Shuffle

    O Dataflow Shuffle é a operação básica por trás das transformações do Dataflow, como GroupByKey, CoGroupByKey e Combine. A operação do Dataflow Shuffle particiona e agrupa os dados por chave de maneira escalonável, eficiente e tolerante a falhas. Atualmente, o Dataflow usa uma implementação de embaralhamento que é executada totalmente em máquinas virtuais de worker e consome CPU, memória e armazenamento em disco permanente do worker. O recurso Dataflow Shuffle baseado em serviços, disponível somente para pipelines em lote, move a operação de embaralhamento das VMs de worker para o back-end do serviço do Dataflow.

    Benefícios do Dataflow Shuffle

    O Dataflow Shuffle baseado em serviços oferece os seguintes benefícios:

    • Tempo de execução mais rápido de pipelines em lote para a maioria dos tipos de job de pipeline.
    • Redução nos recursos consumidos de CPU, memória e armazenamento de disco permanente nas VMs de trabalho.
    • Melhor escalonamento automático, já que as VMs não têm mais dados embaralhados e, portanto, podem ser reduzidas mais cedo.
    • Melhor tolerância a falhas. Uma VM não íntegra que detiver dados do Dataflow Shuffle não causará falha em todo o job, como aconteceria se o recurso não estivesse sendo usado.

    A maior parte da redução nos recursos do worker é motivada pela transferência do trabalho de embaralhamento para o serviço do Dataflow. Por esse motivo, há uma cobrança associada ao uso do Dataflow Shuffle. No entanto, espera-se que o faturamento total dos pipelines que usam a implementação do Dataflow com base em serviço seja menor ou igual ao daqueles que não usam essa opção.

    Para a maioria dos tipos de job de pipeline, espera-se que o Dataflow Shuffle seja executado mais rapidamente do que a implementação de embaralhamento em execução nas VMs de worker. No entanto, os tempos de execução podem variar. Se você estiver executando um pipeline com prazos importantes, recomendamos alocar tempo de espera suficiente antes do prazo final. Além disso, considere solicitar uma cota maior para o Shuffle.

    Considerações sobre disco

    Ao usar o recurso Dataflow Shuffle com base em serviço, não é necessário anexar discos permanentes grandes às VMs de worker. O Dataflow anexa automaticamente um disco de inicialização pequeno de 25 GB. No entanto, devido ao tamanho reduzido desse disco, há considerações importantes a serem observadas ao usar o Dataflow Shuffle.

    • Uma VM de worker usa parte dos 25 GB de espaço em disco para o sistema operacional, binários, registros e contêineres. Quando você usa o Dataflow Shuffle, os jobs que usam uma quantidade significativa de disco e excedem a capacidade restante podem falhar.
    • Jobs que usam muita E/S de disco podem ser lentos devido ao desempenho do disco pequeno. Veja mais informações sobre as diferenças de desempenho entre os tamanhos de disco na página Desempenho de disco permanente do Compute Engine.

    Se alguma dessas considerações se aplicar ao job, use as opções de pipeline para especificar um tamanho de disco maior.

    Como usar o Dataflow Shuffle

    No momento, o Dataflow Shuffle baseado em serviços está disponível nas seguintes regiões:

    • asia-east1 (Taiwan)
    • asia-east2 (Hong Kong)
    • asia-northeast1 (Tóquio)
    • asia-northeast2 (Osaka)
    • asia-northeast3 (Seul)
    • asia-south1 (Mumbai)
    • asia-southeast1 (Singapura)
    • asia-southeast2 (Jacarta)
    • australia-southeast1 (Sydney)
    • europe-north1 (Finlândia)
    • europe-west1 (Bélgica)
    • europe-west2 (Londres)
    • europe-west3 (Frankfurt)
    • europe-west4 (Países Baixos)
    • europe-west6 (Zurique)
    • northamerica-northeast1 (Montreal)
    • southamerica-east1 (São Paulo)
    • us-east1 (Carolina do Sul)
    • us-east4 (Norte da Virgínia)
    • us-central1 (Iowa)
    • us-west1 (Oregon)
    • us-west2 (Los Angeles)
    • us-west3 (Salt Lake City)
    • us-west4 (Las Vegas)

    No futuro, o Dataflow Shuffle estará disponível em mais regiões.

    Java: SDK 2.x

    Para usar o Dataflow Shuffle baseado em serviço nos pipelines em lote, especifique o seguinte parâmetro:
    --experiments=shuffle_mode=service

    Se você usa o Dataflow Shuffle no pipeline, não especifique o parâmetro --zone. Em vez disso, especifique o parâmetro --region e defina o valor como uma das regiões em que o Shuffle está disponível. O Dataflow selecionará automaticamente a zona na região que você especificou. O serviço apresentará um erro se você especificar o parâmetro --zone e defini-lo como uma zona fora das regiões disponíveis.

    Python

    Para usar o Dataflow Shuffle baseado em serviço nos pipelines em lote, especifique o seguinte parâmetro:
    --experiments=shuffle_mode=service

    Se você usa o Dataflow Shuffle no pipeline, não especifique o parâmetro --zone. Em vez disso, especifique o parâmetro --region e defina o valor como uma das regiões em que o Shuffle está disponível. O Dataflow selecionará automaticamente a zona na região que você especificou. O serviço apresentará um erro se você especificar o parâmetro --zone e defini-lo como uma zona fora das regiões disponíveis.

    Java: SDK 1.x

    Programação flexível de recursos do Dataflow

    O Dataflow FlexRS reduz os custos de processamento em lote usando técnicas avançadas de programação, o serviço do Dataflow Shuffle e uma combinação de instâncias de máquina virtual (VM) preemptiva e VMs comuns. Ao executar VMs preemptivas e VMs comuns em paralelo, o Dataflow melhora a experiência do usuário caso o Compute Engine interrompa as instâncias de VM preemptiva durante um evento do sistema. O FlexRS ajuda a garantir que o pipeline continue a progredir e que você não perca o trabalho anterior quando o Compute Engine força a interrupção das VMs preemptivas. Para mais informações sobre o FlexRS, consulte Como usar a programação flexível de recursos no Dataflow.

    Dataflow Runner v2

    O executor atual do Dataflow de produção utiliza workers específicos da linguagem ao executar pipelines do Apache Beam. Para melhorar a escalonabilidade, a generalidade, a extensibilidade e a eficiência, o executor do Dataflow está migrando para uma arquitetura mais baseada em serviços. Essas alterações incluem uma arquitetura de worker mais eficiente e portátil empacotada com o Shuffle Service e o Streaming Engine.

    O novo executor do Dataflow, o Dataflow Runner v2, agora é o padrão para pipelines de streaming em Python (versão 2.21.0 ou mais recentes), e ele vem por padrão em todos os novos pipelines de lote em Python (versão 2.21.0 ou mais recentes) desde fevereiro 2021. Não é necessário fazer alterações no código do pipeline para aproveitar essa nova arquitetura.

    Benefícios do uso do Dataflow Runner v2

    A partir de pipelines de lote e streaming em Python, novos recursos estarão disponíveis apenas no Dataflow Runner v2. Além disso, a eficiência aprimorada da arquitetura do Dataflow Runner v2 pode levar a melhorias de desempenho nos jobs do Dataflow.

    Ao usar o Dataflow Runner v2, você notará uma redução na sua fatura. Como o modelo de faturamento do Dataflow Runner v2 ainda não é final, sua fatura pode voltar aos níveis atuais à medida que o novo executor é ativado em todos os pipelines.

    O Dataflow Runner v2 também permite pré-criar o contêiner Python, o que pode melhorar os tempos de inicialização da VM e o desempenho do escalonamento automático. Para testar esse recurso experimental, ative a API Cloud Build no seu projeto e envie seu pipeline com o seguinte parâmetro:
    --prebuild_sdk_container_engine=cloud_build.

    Quando o job for concluído ou interrompido, remova a imagem pré-criada do Container Registry. O URL da imagem pode ser encontrado na IU de monitoramento do Dataflow em Opções de pipeline.

    Como usar o Dataflow Runner v2

    O Dataflow Runner v2 está disponível em regiões com endpoints regionais do Dataflow. Enquanto o executor estiver sendo lançado, se você quiser testá-lo, use o seguinte:

    Java: SDK 2.x

    O Dataflow Runner v2 não está disponível para Java no momento.

    Python

    O Dataflow Runner v2 requer o Streaming Engine para jobs de streaming e o Dataflow Shuffle para jobs em lote. Para ativá-los para os respectivos jobs, especifique o seguinte parâmetro:
    --experiments=use_runner_v2

    Como depurar jobs do Dataflow Runner v2

    Para depurar jobs usando o Dataflow Runner v2, siga as etapas de depuração padrão. No entanto, esteja ciente do seguinte ao usar o Dataflow Runner v2:

    • Os jobs do Dataflow Runner v2 executam dois tipos de processos na VM do worker: o processo do SDK e o processo de proteção do executor. Dependendo do pipeline e do tipo de VM, pode haver um ou mais processos do SDK, mas há apenas um processo de proteção do executor por VM.
    • Os processos do SDK executam o código do usuário e outras funções específicas da linguagem, enquanto o processo de proteção do executor gerencia todo o restante.
    • O processo de aproveitar o executor aguarda que todos os processos do SDK se conectem a ele antes de começar a solicitar trabalho do Dataflow.
    • Os jobs atrasar se a VM do worker fizer o download e instalar dependências durante a inicialização do processo do SDK. Se houver problemas em um processo do SDK, como inicialização ou instalação de bibliotecas, o worker informará seu status como não íntegro. Se os tempos de inicialização aumentarem, ative a API Cloud Build no seu projeto e envie seu pipeline com o seguinte parâmetro:
      --prebuild_sdk_container_engine=cloud_build.
    • Os registros de VM de worker (disponíveis por meio do Logs Explorer ou da interface de monitoramento do Dataflow) incluem registros do processo de aproveitamento do executor, bem como registros dos processos do SDK.
    • Para diagnosticar problemas no seu código de usuário, examine os registros do worker dos processos do SDK. Se você encontrar erros nos registros da proteção do executor, entre em contato com o suporte para registrar um bug.